Commit 4e15e330 by y1sa

mysql CDC实现

parent 9b8b1a66
......@@ -51,12 +51,10 @@ public abstract class AbstractCdcEngine<T extends AbstractCdcEngine<T>> {
debeziumProps.setProperty("snapshot.mode", "schema_only");
debeziumProps.setProperty("database.history.skip.unparseable.ddl", Boolean.toString(true));
debeziumProps.setProperty("database.history.store.only.captured.tables.ddl", Boolean.toString(true));
debeziumProps.setProperty("database.hostname", props.getHost());
debeziumProps.setProperty("database.port", String.valueOf(props.getPort()));
debeziumProps.setProperty("database.user", props.getUsername());
debeziumProps.setProperty("database.password", props.getPassword());
debeziumProps.setProperty(Constants.DATABASE_SERVER_NAME_PROP, Constants.DATABASE_SERVER_NAME_VALUE);
debeziumProps.setProperty(Constants.DATABASE_HISTORY_PROP, FileDatabaseHistory.class.getName());
debeziumProps.setProperty("name", getClass().getSimpleName());
......
......@@ -17,4 +17,18 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>com.tbyf.cdcengine2</groupId>
<artifactId>core</artifactId>
</dependency>
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mysql</artifactId>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package com.tbyf.cdcengine2.mysql;
import com.tbyf.cdcengine2.core.AbstractCdcEngine;
import com.tbyf.cdcengine2.core.Constants;
import io.debezium.connector.mysql.MySqlConnector;
import java.util.Arrays;
import java.util.HashSet;
import java.util.stream.Collectors;
public class MySqlCdcEngine extends AbstractCdcEngine<MySqlCdcEngine> {
static {
MySqlUtils.loadDriver();
}
public MySqlCdcEngine(MySqlCdcEngineProperties props) {
super(props);
debeziumProps.setProperty(Constants.CONNECTOR_CLASS_PROP, MySqlConnector.class.getName());
debeziumProps.setProperty(Constants.DATABASE_DBNAME_PROP, props.getDbname());
String dbname = props.getDbname();
debeziumProps.setProperty("database.serverTimezone", "Asia/Shanghai");
String capturedTables = new HashSet<>(Arrays.asList(props.getCapturedTables().toUpperCase().split(",")))
.stream().map(tableName -> dbname + "." + tableName)
.collect(Collectors.joining(","));
debeziumProps.setProperty(Constants.TABLE_INCLUDE_LIST_PROP, capturedTables);
}
public static void main(String[] args) {
MySqlCdcEngineProperties props = new MySqlCdcEngineProperties();
props.setHost("localhost");
props.setPort(3306);
props.setUsername("root");
props.setPassword("root");
props.setDbname("easychat");
props.setCapturedTables("app_update");
new MySqlCdcEngine(props)
.onChange(record -> {
System.out.println(record);
}).start();
}
}
package com.tbyf.cdcengine2.mysql;
import com.tbyf.cdcengine2.core.BaseCdcEngineProperties;
public class MySqlCdcEngineProperties extends BaseCdcEngineProperties {
private String dbname;
private String capturedTables;
public String getDbname() {
return dbname;
}
public void setDbname(String dbname) {
this.dbname = dbname;
}
public String getCapturedTables() {
return capturedTables;
}
public void setCapturedTables(String capturedTables) {
this.capturedTables = capturedTables;
}
}
package com.tbyf.cdcengine2.mysql;
public class MySqlUtils {
public static void loadDriver() {
try {
Class.forName("com.mysql.cj.jdbc.Driver");
} catch (ClassNotFoundException e) {
throw new IllegalStateException("未找到mysql jdbc驱动", e);
}
}
public static String buildUrl(String host, int port, String database) {
return "jdbc:mysql://" + host + ":" + port + "/" + database + "?serverTimezone=Asia/Shanghai";
}
}
package com.tbyf.cdcengine2.oracle;
import com.tbyf.cdcengine2.core.AbstractCdcEngine;
import com.tbyf.cdcengine2.core.ChangeListener;
import com.tbyf.cdcengine2.core.ChangedRecord;
import com.tbyf.cdcengine2.core.Constants;
import io.debezium.connector.oracle.OracleConnector;
......@@ -19,7 +17,6 @@ public class OracleCdcEngine extends AbstractCdcEngine<OracleCdcEngine> {
public OracleCdcEngine(OracleCdcEngineProperties props) {
super(props);
debeziumProps.setProperty(Constants.CONNECTOR_CLASS_PROP, OracleConnector.class.getName());
debeziumProps.setProperty(Constants.DATABASE_DBNAME_PROP, props.getDbname());
debeziumProps.setProperty("log.mining.strategy", "online_catalog");
......
......@@ -59,6 +59,11 @@
<version>${debezium.version}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mysql</artifactId>
<version>${debezium.version}</version>
</dependency>
<dependency>
<groupId>com.microsoft.sqlserver</groupId>
<artifactId>mssql-jdbc</artifactId>
<version>${mssql-jdbc.version}</version>
......
......@@ -36,9 +36,5 @@
<groupId>com.microsoft.sqlserver</groupId>
<artifactId>mssql-jdbc</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package com.tbyf.cdcengine2.sqlserver;
import com.tbyf.cdcengine2.core.AbstractCdcEngine;
import com.tbyf.cdcengine2.core.ChangeHandler;
import com.tbyf.cdcengine2.core.ChangedRecord;
import com.tbyf.cdcengine2.core.Constants;
import io.debezium.connector.sqlserver.SqlServerConnector;
......@@ -23,7 +21,7 @@ public class SqlServerCdcEngine extends AbstractCdcEngine<SqlServerCdcEngine> {
debeziumProps.setProperty(Constants.CONNECTOR_CLASS_PROP, SqlServerConnector.class.getName());
debeziumProps.setProperty(Constants.DATABASE_DBNAME_PROP, props.getDbName());
String schema = SqlServerCdcEngineProperties.SCHEMA_NAME;
String schema = SqlServerCdcEngineProperties.DATABASE_OWNER_SCHEMA_NAME;
String capturedTables = new HashSet<>(Arrays.asList(props.getCapturedTables().toUpperCase().split(",")))
.stream().map(tableName -> schema + "." + tableName)
.collect(Collectors.joining(","));
......@@ -46,7 +44,7 @@ public class SqlServerCdcEngine extends AbstractCdcEngine<SqlServerCdcEngine> {
props.setUsername("sa");
props.setPassword("1");
props.setDbName("dz_his");
props.setCapturedTables("BA_BRDA");
props.setCapturedTables("BA_BAJY");
SqlServerCdcEngine engine = new SqlServerCdcEngine(props);
engine.onChange(record -> {
......
......@@ -4,7 +4,7 @@ import com.tbyf.cdcengine2.core.BaseCdcEngineProperties;
public class SqlServerCdcEngineProperties extends BaseCdcEngineProperties {
public static final String SCHEMA_NAME = "dbo";
public static final String DATABASE_OWNER_SCHEMA_NAME = "dbo";
private String dbName;
private String capturedTables;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment