Commit 29f7a9ad by y1sa

新增事件机制

parent fcc3efd9
...@@ -3,6 +3,7 @@ package com.tbyf.cdcengine2.core; ...@@ -3,6 +3,7 @@ package com.tbyf.cdcengine2.core;
import io.debezium.embedded.Connect; import io.debezium.embedded.Connect;
import io.debezium.engine.DebeziumEngine; import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.spi.OffsetCommitPolicy; import io.debezium.engine.spi.OffsetCommitPolicy;
import io.debezium.relational.history.FileDatabaseHistory;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -14,9 +15,12 @@ import java.util.concurrent.ArrayBlockingQueue; ...@@ -14,9 +15,12 @@ import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
public abstract class AbstractCdcEngine<T extends AbstractCdcEngine<T>> { public abstract class AbstractCdcEngine<T extends AbstractCdcEngine<T>> {
private final static AtomicLong nextConsumerId = new AtomicLong(0);
private final Logger logger = LoggerFactory.getLogger(getClass()); private final Logger logger = LoggerFactory.getLogger(getClass());
private final Object startupStopMonitor = new Object(); private final Object startupStopMonitor = new Object();
...@@ -39,6 +43,12 @@ public abstract class AbstractCdcEngine<T extends AbstractCdcEngine<T>> { ...@@ -39,6 +43,12 @@ public abstract class AbstractCdcEngine<T extends AbstractCdcEngine<T>> {
private volatile List<ChangeListener> changeListeners = new ArrayList<>(); private volatile List<ChangeListener> changeListeners = new ArrayList<>();
public AbstractCdcEngine() {
debeziumProps.setProperty(Constants.DATABASE_SERVER_NAME_PROP, Constants.DATABASE_SERVER_NAME_VALUE);
debeziumProps.setProperty(Constants.DATABASE_HISTORY_PROP, FileDatabaseHistory.class.getName());
debeziumProps.setProperty("name", getClass().getSimpleName());
}
public void addListener(ChangeListener listener) { public void addListener(ChangeListener listener) {
ArrayList<ChangeListener> changeListeners = new ArrayList<>(this.changeListeners); ArrayList<ChangeListener> changeListeners = new ArrayList<>(this.changeListeners);
changeListeners.add(listener); changeListeners.add(listener);
...@@ -47,6 +57,7 @@ public abstract class AbstractCdcEngine<T extends AbstractCdcEngine<T>> { ...@@ -47,6 +57,7 @@ public abstract class AbstractCdcEngine<T extends AbstractCdcEngine<T>> {
public void start() { public void start() {
synchronized (startupStopMonitor) { synchronized (startupStopMonitor) {
logger.info("开始启动CDC引擎...");
this.running = true; this.running = true;
consumerThread = new Thread(() -> { consumerThread = new Thread(() -> {
List<ChangeListener> changeListeners = null; List<ChangeListener> changeListeners = null;
...@@ -80,6 +91,7 @@ public abstract class AbstractCdcEngine<T extends AbstractCdcEngine<T>> { ...@@ -80,6 +91,7 @@ public abstract class AbstractCdcEngine<T extends AbstractCdcEngine<T>> {
} }
} }
} catch (InterruptedException ignored) { } catch (InterruptedException ignored) {
break;
} }
} }
// 消费完剩余的记录 // 消费完剩余的记录
...@@ -100,7 +112,7 @@ public abstract class AbstractCdcEngine<T extends AbstractCdcEngine<T>> { ...@@ -100,7 +112,7 @@ public abstract class AbstractCdcEngine<T extends AbstractCdcEngine<T>> {
} }
} }
}, "cdc-consumer"); }, "cdc-consumer-" + nextConsumerId.getAndIncrement());
consumerThread.start(); consumerThread.start();
debeziumEngine = DebeziumEngine.create(Connect.class) debeziumEngine = DebeziumEngine.create(Connect.class)
...@@ -131,6 +143,7 @@ public abstract class AbstractCdcEngine<T extends AbstractCdcEngine<T>> { ...@@ -131,6 +143,7 @@ public abstract class AbstractCdcEngine<T extends AbstractCdcEngine<T>> {
public void stop() { public void stop() {
synchronized (startupStopMonitor) { synchronized (startupStopMonitor) {
logger.info("开始关闭CDC引擎...");
running = false; running = false;
DebeziumEngine<?> debeziumEngine = this.debeziumEngine; DebeziumEngine<?> debeziumEngine = this.debeziumEngine;
if (debeziumEngine != null) { if (debeziumEngine != null) {
...@@ -144,15 +157,16 @@ public abstract class AbstractCdcEngine<T extends AbstractCdcEngine<T>> { ...@@ -144,15 +157,16 @@ public abstract class AbstractCdcEngine<T extends AbstractCdcEngine<T>> {
ExecutorService executor = this.executor; ExecutorService executor = this.executor;
if (executor != null) { if (executor != null) {
executor.shutdown(); executor.shutdownNow();
} }
this.executor = null; this.executor = null;
Thread consumerThread = this.consumerThread; Thread consumerThread = this.consumerThread;
if (consumerThread != null && consumerThread.isAlive()) { if (consumerThread != null) {
consumerThread.interrupt(); consumerThread.interrupt();
} }
this.consumerThread = null; this.consumerThread = null;
} }
} }
} }
...@@ -9,7 +9,6 @@ public class ChangedRecord { ...@@ -9,7 +9,6 @@ public class ChangedRecord {
private String schema; private String schema;
private String table; private String table;
private Map<String, Object> before; private Map<String, Object> before;
private Map<String, Object> now; private Map<String, Object> now;
private OperationType operationType; private OperationType operationType;
...@@ -77,7 +76,9 @@ public class ChangedRecord { ...@@ -77,7 +76,9 @@ public class ChangedRecord {
@Override @Override
public String toString() { public String toString() {
return "ChangedRecord{" + return "ChangedRecord{" +
"before=" + before + "schema='" + schema + '\'' +
", table='" + table + '\'' +
", before=" + before +
", now=" + now + ", now=" + now +
", operationType=" + operationType + ", operationType=" + operationType +
'}'; '}';
......
package com.tbyf.cdcengine2.core;
public class Constants {
public static final String DATABASE_SERVER_NAME_PROP = "database.server.name";
public static final String DATABASE_SERVER_NAME_VALUE = "server1";
public static final String CONNECTOR_CLASS_PROP = "connector.class";
public static final String DATABASE_HISTORY_PROP = "database.history";
public static final String DATABASE_HISTORY_FILE_FILENAME_PROP = "database.history.file.filename";
public static final String OFFSET_STORAGE_FILE_FILENAME_PROP = "offset.storage.file.filename";
public static final String SCHEMA_INCLUDE_LIST_PROP = "schema.include.list";
public static final String TABLE_INCLUDE_LIST_PROP = "table.include.list";
}
...@@ -37,8 +37,8 @@ public class DefaultChangeEventAdapter implements ChangeEventAdapter { ...@@ -37,8 +37,8 @@ public class DefaultChangeEventAdapter implements ChangeEventAdapter {
String destination = event.destination(); String destination = event.destination();
String schemaTable = destination.substring(destination.indexOf(".") + 1); String schemaTable = destination.substring(destination.indexOf(".") + 1);
String[] split = schemaTable.split("\\."); String[] split = schemaTable.split("\\.");
record.schema(split[0]) record.schema(split[0]);
.table(split[1]); record.table(split[1]);
Struct before = value.getStruct(valueSchema.field(Envelope.FieldName.BEFORE).name()); Struct before = value.getStruct(valueSchema.field(Envelope.FieldName.BEFORE).name());
Struct now = value.getStruct(valueSchema.field(Envelope.FieldName.AFTER).name()); Struct now = value.getStruct(valueSchema.field(Envelope.FieldName.AFTER).name());
String op = value.getString(valueSchema.field(Envelope.FieldName.OPERATION).name()); String op = value.getString(valueSchema.field(Envelope.FieldName.OPERATION).name());
......
package com.tbyf.cdcengine2.oracle; package com.tbyf.cdcengine2.oracle;
import com.tbyf.cdcengine2.core.AbstractCdcEngine; import com.tbyf.cdcengine2.core.AbstractCdcEngine;
import com.tbyf.cdcengine2.core.ChangeEventAdapter; import com.tbyf.cdcengine2.core.ChangeListener;
import com.tbyf.cdcengine2.core.DefaultChangeEventAdapter; import com.tbyf.cdcengine2.core.ChangedRecord;
import com.tbyf.cdcengine2.core.Constants;
import io.debezium.connector.oracle.OracleConnector; import io.debezium.connector.oracle.OracleConnector;
import io.debezium.relational.history.FileDatabaseHistory;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashSet; import java.util.HashSet;
import java.util.stream.Collectors; import java.util.stream.Collectors;
...@@ -16,24 +13,20 @@ import java.util.stream.Collectors; ...@@ -16,24 +13,20 @@ import java.util.stream.Collectors;
public class OracleCdcEngine extends AbstractCdcEngine<OracleCdcEngine> { public class OracleCdcEngine extends AbstractCdcEngine<OracleCdcEngine> {
static { static {
try { OracleUtils.loadDriver();
Class.forName("oracle.jdbc.driver.OracleDriver");
} catch (ClassNotFoundException e) {
throw new IllegalStateException("未找到Oracle jdbc驱动", e);
}
} }
public OracleCdcEngine(OracleCdcEngineProperties props) { public OracleCdcEngine(OracleCdcEngineProperties props) {
debeziumProps.setProperty("connector.class", OracleConnector.class.getName()); debeziumProps.setProperty(Constants.CONNECTOR_CLASS_PROP, OracleConnector.class.getName());
debeziumProps.setProperty("database.history", FileDatabaseHistory.class.getName());
debeziumProps.setProperty("database.history.file.filename", props.namespace() + "_his.dat");
debeziumProps.setProperty("offset.storage.file.filename", props.namespace() + "_off.dat");
debeziumProps.setProperty("database.hostname", props.host()); debeziumProps.setProperty(Constants.DATABASE_HISTORY_FILE_FILENAME_PROP, props.getNamespace() + "_his.dat");
debeziumProps.setProperty("database.port", String.valueOf(props.port())); debeziumProps.setProperty(Constants.OFFSET_STORAGE_FILE_FILENAME_PROP, props.getNamespace() + "_off.dat");
debeziumProps.setProperty("database.user", props.username());
debeziumProps.setProperty("database.password", props.password()); debeziumProps.setProperty("database.hostname", props.getHost());
debeziumProps.setProperty("database.dbname", props.dbname()); debeziumProps.setProperty("database.port", String.valueOf(props.getPort()));
debeziumProps.setProperty("database.user", props.getUsername());
debeziumProps.setProperty("database.password", props.getPassword());
debeziumProps.setProperty("database.dbname", props.getDbname());
debeziumProps.setProperty("log.mining.strategy", "online_catalog"); debeziumProps.setProperty("log.mining.strategy", "online_catalog");
debeziumProps.setProperty("snapshot.mode", "schema_only"); debeziumProps.setProperty("snapshot.mode", "schema_only");
...@@ -41,35 +34,21 @@ public class OracleCdcEngine extends AbstractCdcEngine<OracleCdcEngine> { ...@@ -41,35 +34,21 @@ public class OracleCdcEngine extends AbstractCdcEngine<OracleCdcEngine> {
debeziumProps.setProperty("database.history.skip.unparseable.ddl", Boolean.toString(true)); debeziumProps.setProperty("database.history.skip.unparseable.ddl", Boolean.toString(true));
debeziumProps.setProperty("database.history.store.only.captured.tables.ddl", Boolean.toString(true)); debeziumProps.setProperty("database.history.store.only.captured.tables.ddl", Boolean.toString(true));
debeziumProps.setProperty("database.server.name", "server1"); String schema = props.getUsername().toUpperCase();
debeziumProps.setProperty("name", "oracle_cdc_engine"); debeziumProps.setProperty(Constants.SCHEMA_INCLUDE_LIST_PROP, schema);
String schema = props.username().toUpperCase();
debeziumProps.setProperty("schema.include.list", schema);
String tables = new HashSet<>(Arrays.asList(props.cdcTables().toUpperCase().split(","))) String capturedTables = new HashSet<>(Arrays.asList(props.getCapturedTables().toUpperCase().split(",")))
.stream().map(tableName -> schema + "." + tableName) .stream().map(tableName -> schema + "." + tableName)
.collect(Collectors.joining(",")); .collect(Collectors.joining(","));
debeziumProps.setProperty("table.include.list", tables); debeziumProps.setProperty(Constants.TABLE_INCLUDE_LIST_PROP, capturedTables);
// 开启supplemental logging // 开启supplemental logging
String url = buildOracleJdbcUrl(props.host(), props.port(), props.dbname()); String url = OracleUtils.buildOracleJdbcUrl(props.getHost(), props.getPort(), props.getDbname());
SupplementalLoggingUtils.enableDBSupplementalLogging(getConn(url, props.username(), props.password())); SupplementalLoggingUtils.enableDBSupplementalLogging(OracleUtils.getConn(url, props.getUsername(), props.getPassword()));
SupplementalLoggingUtils.enableSchemaTablesSupplementalLogging(getConn(url, props.username(), props.password()), props.username(), SupplementalLoggingUtils.enableSchemaTablesSupplementalLogging(OracleUtils.getConn(url, props.getUsername(),
new HashSet<>(Arrays.asList(props.cdcTables().split(",")))); props.getPassword()), schema,
} new HashSet<>(Arrays.asList(props.getCapturedTables().split(","))));
private static Connection getConn(String url, String username, String password) {
try {
return DriverManager.getConnection(url, username, password);
} catch (SQLException e) {
throw new IllegalStateException("获取数据库连接失败", e);
}
}
public static String buildOracleJdbcUrl(String host, int port, String dbname) {
return String.format("jdbc:oracle:thin:@%s:%d:%s", host, port, dbname);
} }
} }
...@@ -2,77 +2,69 @@ package com.tbyf.cdcengine2.oracle; ...@@ -2,77 +2,69 @@ package com.tbyf.cdcengine2.oracle;
public class OracleCdcEngineProperties { public class OracleCdcEngineProperties {
private String host; private String host = "localhost";
private int port; private int port = 1521;
private String dbname; private String dbname;
private String username; private String username;
private String password; private String password;
private String cdcTables; private String capturedTables;
private String namespace; private String namespace = "default";
public OracleCdcEngineProperties namespace(String namespace) { public String getHost() {
this.namespace = namespace; return host;
return this;
} }
public String namespace() { public void setHost(String host) {
if (namespace == null) { this.host = host;
return "default";
}
return namespace;
} }
public OracleCdcEngineProperties host(String host) { public int getPort() {
this.host = host; return port;
return this;
} }
public OracleCdcEngineProperties port(int port) { public void setPort(int port) {
this.port = port; this.port = port;
return this;
} }
public OracleCdcEngineProperties dbname(String dbname) { public String getDbname() {
this.dbname = dbname; return dbname;
return this;
} }
public OracleCdcEngineProperties username(String username) { public void setDbname(String dbname) {
this.username = username; this.dbname = dbname;
return this;
} }
public OracleCdcEngineProperties password(String password) { public String getUsername() {
this.password = password; return username;
return this;
} }
public OracleCdcEngineProperties cdcTables(String cdcTables) { public void setUsername(String username) {
this.cdcTables = cdcTables; this.username = username;
return this;
} }
public String host() { public String getPassword() {
return host; return password;
} }
public int port() { public void setPassword(String password) {
return port; this.password = password;
} }
public String dbname() { public String getCapturedTables() {
return dbname; return capturedTables;
} }
public String username() { public void setCapturedTables(String capturedTables) {
return username; this.capturedTables = capturedTables;
} }
public String password() { public String getNamespace() {
return password; return namespace;
} }
public String cdcTables() { public void setNamespace(String namespace) {
return cdcTables; this.namespace = namespace;
} }
} }
...@@ -2,6 +2,9 @@ package com.tbyf.cdcengine2.oracle; ...@@ -2,6 +2,9 @@ package com.tbyf.cdcengine2.oracle;
import com.tbyf.cdcengine2.core.JdbcConnInfo; import com.tbyf.cdcengine2.core.JdbcConnInfo;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
...@@ -26,4 +29,24 @@ public class OracleUtils { ...@@ -26,4 +29,24 @@ public class OracleUtils {
throw new IllegalArgumentException(url); throw new IllegalArgumentException(url);
} }
} }
public static String buildOracleJdbcUrl(String host, int port, String dbname) {
return String.format("jdbc:oracle:thin:@%s:%d:%s", host, port, dbname);
}
public static Connection getConn(String url, String username, String password) {
try {
return DriverManager.getConnection(url, username, password);
} catch (SQLException e) {
throw new IllegalStateException("获取数据库连接失败", e);
}
}
public static void loadDriver() {
try {
Class.forName("oracle.jdbc.driver.OracleDriver");
} catch (ClassNotFoundException e) {
throw new IllegalStateException("未找到Oracle jdbc驱动", e);
}
}
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment