Commit fcc3efd9 by y1sa

新增事件机制

parent 83b44f45
...@@ -39,4 +39,6 @@ build/ ...@@ -39,4 +39,6 @@ build/
target/ target/
*.dat *.dat
\ No newline at end of file
.idea/
\ No newline at end of file
# Default ignored files
/shelf/
/workspace.xml
# Editor-based HTTP Client requests
/httpRequests/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Encoding">
<file url="file://$PROJECT_DIR$/common/src/main/java" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/common/src/main/resources" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/core/src/main/java" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/core/src/main/resources" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/mysql-cdc/src/main/java" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/mysql-cdc/src/main/resources" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/oracle-cdc/src/main/java" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/oracle-cdc/src/main/resources" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/sqlserver-cdc/src/main/java" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/sqlserver-cdc/src/main/resources" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/src/main/java" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/src/main/resources" charset="UTF-8" />
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ExternalStorageConfigurationManager" enabled="true" />
<component name="MavenProjectsManager">
<option name="originalFiles">
<list>
<option value="$PROJECT_DIR$/pom.xml" />
</list>
</option>
<option name="ignoredFiles">
<set>
<option value="$PROJECT_DIR$/common/pom.xml" />
</set>
</option>
</component>
<component name="ProjectInspectionProfilesVisibleTreeState">
<entry key="Project Default">
<profile-state>
<expanded-state>
<State />
<State>
<id>CodePlugin DevKit</id>
</State>
<State>
<id>GeneralJavaScript and TypeScript</id>
</State>
<State>
<id>JavaScript and TypeScript</id>
</State>
<State>
<id>Kotlin</id>
</State>
<State>
<id>Plugin DevKit</id>
</State>
<State>
<id>Plugin descriptorPlugin DevKit</id>
</State>
<State>
<id>Style issuesKotlin</id>
</State>
</expanded-state>
<selected-state>
<State>
<id>Angular</id>
</State>
</selected-state>
</profile-state>
</entry>
</component>
<component name="ProjectRootManager" version="2" languageLevel="JDK_1_8" default="true" project-jdk-name="1.8" project-jdk-type="JavaSDK">
<output url="file://$PROJECT_DIR$/out" />
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Palette2">
<group name="Swing">
<item class="com.intellij.uiDesigner.HSpacer" tooltip-text="Horizontal Spacer" icon="/com/intellij/uiDesigner/icons/hspacer.svg" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="1" hsize-policy="6" anchor="0" fill="1" />
</item>
<item class="com.intellij.uiDesigner.VSpacer" tooltip-text="Vertical Spacer" icon="/com/intellij/uiDesigner/icons/vspacer.svg" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="6" hsize-policy="1" anchor="0" fill="2" />
</item>
<item class="javax.swing.JPanel" icon="/com/intellij/uiDesigner/icons/panel.svg" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="3" hsize-policy="3" anchor="0" fill="3" />
</item>
<item class="javax.swing.JScrollPane" icon="/com/intellij/uiDesigner/icons/scrollPane.svg" removable="false" auto-create-binding="false" can-attach-label="true">
<default-constraints vsize-policy="7" hsize-policy="7" anchor="0" fill="3" />
</item>
<item class="javax.swing.JButton" icon="/com/intellij/uiDesigner/icons/button.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="3" anchor="0" fill="1" />
<initial-values>
<property name="text" value="Button" />
</initial-values>
</item>
<item class="javax.swing.JRadioButton" icon="/com/intellij/uiDesigner/icons/radioButton.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="3" anchor="8" fill="0" />
<initial-values>
<property name="text" value="RadioButton" />
</initial-values>
</item>
<item class="javax.swing.JCheckBox" icon="/com/intellij/uiDesigner/icons/checkBox.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="3" anchor="8" fill="0" />
<initial-values>
<property name="text" value="CheckBox" />
</initial-values>
</item>
<item class="javax.swing.JLabel" icon="/com/intellij/uiDesigner/icons/label.svg" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="0" anchor="8" fill="0" />
<initial-values>
<property name="text" value="Label" />
</initial-values>
</item>
<item class="javax.swing.JTextField" icon="/com/intellij/uiDesigner/icons/textField.svg" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="8" fill="1">
<preferred-size width="150" height="-1" />
</default-constraints>
</item>
<item class="javax.swing.JPasswordField" icon="/com/intellij/uiDesigner/icons/passwordField.svg" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="8" fill="1">
<preferred-size width="150" height="-1" />
</default-constraints>
</item>
<item class="javax.swing.JFormattedTextField" icon="/com/intellij/uiDesigner/icons/formattedTextField.svg" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="8" fill="1">
<preferred-size width="150" height="-1" />
</default-constraints>
</item>
<item class="javax.swing.JTextArea" icon="/com/intellij/uiDesigner/icons/textArea.svg" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3">
<preferred-size width="150" height="50" />
</default-constraints>
</item>
<item class="javax.swing.JTextPane" icon="/com/intellij/uiDesigner/icons/textPane.svg" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3">
<preferred-size width="150" height="50" />
</default-constraints>
</item>
<item class="javax.swing.JEditorPane" icon="/com/intellij/uiDesigner/icons/editorPane.svg" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3">
<preferred-size width="150" height="50" />
</default-constraints>
</item>
<item class="javax.swing.JComboBox" icon="/com/intellij/uiDesigner/icons/comboBox.svg" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="0" hsize-policy="2" anchor="8" fill="1" />
</item>
<item class="javax.swing.JTable" icon="/com/intellij/uiDesigner/icons/table.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3">
<preferred-size width="150" height="50" />
</default-constraints>
</item>
<item class="javax.swing.JList" icon="/com/intellij/uiDesigner/icons/list.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="6" hsize-policy="2" anchor="0" fill="3">
<preferred-size width="150" height="50" />
</default-constraints>
</item>
<item class="javax.swing.JTree" icon="/com/intellij/uiDesigner/icons/tree.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3">
<preferred-size width="150" height="50" />
</default-constraints>
</item>
<item class="javax.swing.JTabbedPane" icon="/com/intellij/uiDesigner/icons/tabbedPane.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="3" hsize-policy="3" anchor="0" fill="3">
<preferred-size width="200" height="200" />
</default-constraints>
</item>
<item class="javax.swing.JSplitPane" icon="/com/intellij/uiDesigner/icons/splitPane.svg" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="3" hsize-policy="3" anchor="0" fill="3">
<preferred-size width="200" height="200" />
</default-constraints>
</item>
<item class="javax.swing.JSpinner" icon="/com/intellij/uiDesigner/icons/spinner.svg" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="8" fill="1" />
</item>
<item class="javax.swing.JSlider" icon="/com/intellij/uiDesigner/icons/slider.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="8" fill="1" />
</item>
<item class="javax.swing.JSeparator" icon="/com/intellij/uiDesigner/icons/separator.svg" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3" />
</item>
<item class="javax.swing.JProgressBar" icon="/com/intellij/uiDesigner/icons/progressbar.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="0" fill="1" />
</item>
<item class="javax.swing.JToolBar" icon="/com/intellij/uiDesigner/icons/toolbar.svg" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="0" fill="1">
<preferred-size width="-1" height="20" />
</default-constraints>
</item>
<item class="javax.swing.JToolBar$Separator" icon="/com/intellij/uiDesigner/icons/toolbarSeparator.svg" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="0" anchor="0" fill="1" />
</item>
<item class="javax.swing.JScrollBar" icon="/com/intellij/uiDesigner/icons/scrollbar.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="6" hsize-policy="0" anchor="0" fill="2" />
</item>
</group>
</component>
</project>
\ No newline at end of file
...@@ -35,57 +35,65 @@ public abstract class AbstractCdcEngine<T extends AbstractCdcEngine<T>> { ...@@ -35,57 +35,65 @@ public abstract class AbstractCdcEngine<T extends AbstractCdcEngine<T>> {
private volatile Thread consumerThread; private volatile Thread consumerThread;
private ChangeHandler handler;
private volatile boolean running = true; private volatile boolean running = true;
public interface ChangeHandler { private volatile List<ChangeListener> changeListeners = new ArrayList<>();
void process(ChangedRecord record) throws Exception;
}
@SuppressWarnings("unchecked") public void addListener(ChangeListener listener) {
public T changeHandler(ChangeHandler handler) { ArrayList<ChangeListener> changeListeners = new ArrayList<>(this.changeListeners);
this.handler = handler; changeListeners.add(listener);
return (T) this; this.changeListeners = changeListeners;
} }
public void start() { public void start() {
synchronized (startupStopMonitor) { synchronized (startupStopMonitor) {
ChangeHandler handler0 = this.handler; this.running = true;
if (handler0 == null) {
throw new IllegalStateException("recordConsumer not set");
}
consumerThread = new Thread(() -> { consumerThread = new Thread(() -> {
List<ChangeListener> changeListeners = null;
while (running) { while (running) {
changeListeners = this.changeListeners;
try { try {
List<ChangedRecord> buffer = new ArrayList<>(); List<ChangedRecord> buffer = new ArrayList<>();
recordQueue.drainTo(buffer); recordQueue.drainTo(buffer);
if (buffer.isEmpty()) { if (buffer.isEmpty()) {
ChangedRecord record = recordQueue.take(); ChangedRecord record = recordQueue.take();
try { try {
handler0.process(record); for (ChangeListener listener : changeListeners) {
if (listener.supports(record)) {
listener.onChange(record);
}
}
} catch (Exception e) { } catch (Exception e) {
logger.error("处理{}时发生了异常", record, e); logger.error("处理{}时发生了异常", record, e);
} }
} else { } else {
for (ChangedRecord record : buffer) { for (ChangedRecord record : buffer) {
try { try {
handler0.process(record); for (ChangeListener listener : changeListeners) {
if (listener.supports(record)) {
listener.onChange(record);
}
}
} catch (Exception e) { } catch (Exception e) {
logger.error("处理{}时发生了异常", record, e); logger.error("处理{}时发生了异常", record, e);
} }
} }
} }
} catch (InterruptedException e) { } catch (InterruptedException ignored) {
} }
} }
// 消费完剩余的记录 // 消费完剩余的记录
List<ChangedRecord> buffer = new ArrayList<>(); List<ChangedRecord> buffer = new ArrayList<>();
recordQueue.drainTo(buffer); recordQueue.drainTo(buffer);
if (!buffer.isEmpty()) { if (!buffer.isEmpty()) {
changeListeners = this.changeListeners;
for (ChangedRecord record : buffer) { for (ChangedRecord record : buffer) {
try { try {
handler0.process(record); for (ChangeListener listener : changeListeners) {
if (listener.supports(record)) {
listener.onChange(record);
}
}
} catch (Exception e) { } catch (Exception e) {
logger.error("处理{}时发生了异常", record, e); logger.error("处理{}时发生了异常", record, e);
} }
......
package com.tbyf.cdcengine2.core;
public interface ChangeListener {
default boolean supports(ChangedRecord record) {
return true;
}
void onChange(ChangedRecord record);
}
package com.tbyf.cdcengine2.core;
public class JdbcConnInfo {
private String host;
private int port;
private String dbname;
private String url;
private String user;
private String password;
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public String getDbname() {
return dbname;
}
public void setDbname(String dbname) {
this.dbname = dbname;
}
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public String getUser() {
return user;
}
public void setUser(String user) {
this.user = user;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
}
package com.tbyf.cdcengine2.oracle;
import com.tbyf.cdcengine2.core.JdbcConnInfo;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class OracleUtils {
private static final Pattern urlPattern = Pattern.compile("jdbc:oracle:thin:@([a-zA-Z0-9.]+):(\\d+):([a-zA-Z]+)");
public static JdbcConnInfo parseConnInfoFromUrl(String url) {
Matcher matcher = urlPattern.matcher(url);
if (matcher.find()) {
String host = matcher.group(1);
int port = Integer.parseInt(matcher.group(2));
String dbname = matcher.group(3);
JdbcConnInfo connInfo = new JdbcConnInfo();
connInfo.setUrl(url);
connInfo.setHost(host);
connInfo.setPort(port);
connInfo.setDbname(dbname);
return connInfo;
} else {
throw new IllegalArgumentException(url);
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment