Commit 9b8b1a66 by y1sa

sql server CDC实现

parent c16ad6fd
......@@ -45,7 +45,18 @@ public abstract class AbstractCdcEngine<T extends AbstractCdcEngine<T>> {
private Runnable errorHandler;
public AbstractCdcEngine() {
public AbstractCdcEngine(BaseCdcEngineProperties props) {
debeziumProps.setProperty(Constants.DATABASE_HISTORY_FILE_FILENAME_PROP, props.getNamespace() + "_his.dat");
debeziumProps.setProperty(Constants.OFFSET_STORAGE_FILE_FILENAME_PROP, props.getNamespace() + "_off.dat");
debeziumProps.setProperty("snapshot.mode", "schema_only");
debeziumProps.setProperty("database.history.skip.unparseable.ddl", Boolean.toString(true));
debeziumProps.setProperty("database.history.store.only.captured.tables.ddl", Boolean.toString(true));
debeziumProps.setProperty("database.hostname", props.getHost());
debeziumProps.setProperty("database.port", String.valueOf(props.getPort()));
debeziumProps.setProperty("database.user", props.getUsername());
debeziumProps.setProperty("database.password", props.getPassword());
debeziumProps.setProperty(Constants.DATABASE_SERVER_NAME_PROP, Constants.DATABASE_SERVER_NAME_VALUE);
debeziumProps.setProperty(Constants.DATABASE_HISTORY_PROP, FileDatabaseHistory.class.getName());
debeziumProps.setProperty("name", getClass().getSimpleName());
......
package com.tbyf.cdcengine2.core;
public class BaseCdcEngineProperties {
private String namespace = "default";
private String host = "localhost";
private int port;
private String username;
private String password;
public String getNamespace() {
return namespace;
}
public void setNamespace(String namespace) {
this.namespace = namespace;
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
}
......@@ -17,4 +17,6 @@ public class Constants {
public static final String SCHEMA_INCLUDE_LIST_PROP = "schema.include.list";
public static final String TABLE_INCLUDE_LIST_PROP = "table.include.list";
public static final String DATABASE_DBNAME_PROP = "database.dbname";
}
......@@ -17,22 +17,11 @@ public class OracleCdcEngine extends AbstractCdcEngine<OracleCdcEngine> {
}
public OracleCdcEngine(OracleCdcEngineProperties props) {
super(props);
debeziumProps.setProperty(Constants.CONNECTOR_CLASS_PROP, OracleConnector.class.getName());
debeziumProps.setProperty(Constants.DATABASE_HISTORY_FILE_FILENAME_PROP, props.getNamespace() + "_his.dat");
debeziumProps.setProperty(Constants.OFFSET_STORAGE_FILE_FILENAME_PROP, props.getNamespace() + "_off.dat");
debeziumProps.setProperty("database.hostname", props.getHost());
debeziumProps.setProperty("database.port", String.valueOf(props.getPort()));
debeziumProps.setProperty("database.user", props.getUsername());
debeziumProps.setProperty("database.password", props.getPassword());
debeziumProps.setProperty("database.dbname", props.getDbname());
debeziumProps.setProperty(Constants.DATABASE_DBNAME_PROP, props.getDbname());
debeziumProps.setProperty("log.mining.strategy", "online_catalog");
debeziumProps.setProperty("snapshot.mode", "schema_only");
debeziumProps.setProperty("database.history.skip.unparseable.ddl", Boolean.toString(true));
debeziumProps.setProperty("database.history.store.only.captured.tables.ddl", Boolean.toString(true));
String schema = props.getUsername().toUpperCase();
debeziumProps.setProperty(Constants.SCHEMA_INCLUDE_LIST_PROP, schema);
......
package com.tbyf.cdcengine2.oracle;
public class OracleCdcEngineProperties {
import com.tbyf.cdcengine2.core.BaseCdcEngineProperties;
public class OracleCdcEngineProperties extends BaseCdcEngineProperties {
private String host = "localhost";
private int port = 1521;
private String dbname;
private String username;
private String password;
private String capturedTables;
private String namespace = "default";
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public String getDbname() {
return dbname;
......@@ -34,22 +15,6 @@ public class OracleCdcEngineProperties {
this.dbname = dbname;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public String getCapturedTables() {
return capturedTables;
}
......@@ -58,13 +23,4 @@ public class OracleCdcEngineProperties {
this.capturedTables = capturedTables;
}
public String getNamespace() {
return namespace;
}
public void setNamespace(String namespace) {
this.namespace = namespace;
}
}
......@@ -21,6 +21,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<debezium.version>1.9.0.Final</debezium.version>
<engine.version>1.0.0</engine.version>
<mssql-jdbc.version>9.2.0.jre8</mssql-jdbc.version>
</properties>
<dependencyManagement>
......@@ -52,6 +53,16 @@
<artifactId>core</artifactId>
<version>${engine.version}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-sqlserver</artifactId>
<version>${debezium.version}</version>
</dependency>
<dependency>
<groupId>com.microsoft.sqlserver</groupId>
<artifactId>mssql-jdbc</artifactId>
<version>${mssql-jdbc.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
</project>
\ No newline at end of file
......@@ -17,4 +17,28 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>com.tbyf.cdcengine2</groupId>
<artifactId>core</artifactId>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-sqlserver</artifactId>
<exclusions>
<exclusion>
<groupId>com.microsoft.sqlserver</groupId>
<artifactId>mssql-jdbc</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.microsoft.sqlserver</groupId>
<artifactId>mssql-jdbc</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package com.tbyf.cdcengine2.sqlserver;
import com.tbyf.cdcengine2.core.AbstractCdcEngine;
import com.tbyf.cdcengine2.core.ChangeHandler;
import com.tbyf.cdcengine2.core.ChangedRecord;
import com.tbyf.cdcengine2.core.Constants;
import io.debezium.connector.sqlserver.SqlServerConnector;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.stream.Collectors;
public class SqlServerCdcEngine extends AbstractCdcEngine<SqlServerCdcEngine> {
static {
SqlServerUtils.loadDriver();
}
public SqlServerCdcEngine(SqlServerCdcEngineProperties props) {
super(props);
debeziumProps.setProperty(Constants.CONNECTOR_CLASS_PROP, SqlServerConnector.class.getName());
debeziumProps.setProperty(Constants.DATABASE_DBNAME_PROP, props.getDbName());
String schema = SqlServerCdcEngineProperties.SCHEMA_NAME;
String capturedTables = new HashSet<>(Arrays.asList(props.getCapturedTables().toUpperCase().split(",")))
.stream().map(tableName -> schema + "." + tableName)
.collect(Collectors.joining(","));
debeziumProps.setProperty(Constants.TABLE_INCLUDE_LIST_PROP, capturedTables);
try (Connection conn = SqlServerUtils.getConn(props.getHost(), String.valueOf(props.getPort()), props.getUsername(), props.getPassword())) {
SqlServerCdcHelper.enableDBCdcIfNeeded(conn, props.getDbName());
for (String table : props.getCapturedTables().split(",")) {
SqlServerCdcHelper.enableTableCdcIfNeeded(conn, props.getDbName(), table);
}
} catch (SQLException ex) {
throw new IllegalStateException("开启CDC失败", ex);
}
}
public static void main(String[] args) {
SqlServerCdcEngineProperties props = new SqlServerCdcEngineProperties();
props.setHost("192.168.0.85");
props.setPort(1433);
props.setUsername("sa");
props.setPassword("1");
props.setDbName("dz_his");
props.setCapturedTables("BA_BRDA");
SqlServerCdcEngine engine = new SqlServerCdcEngine(props);
engine.onChange(record -> {
System.out.println(record);
});
engine.start();
}
}
package com.tbyf.cdcengine2.sqlserver;
import com.tbyf.cdcengine2.core.BaseCdcEngineProperties;
public class SqlServerCdcEngineProperties extends BaseCdcEngineProperties {
public static final String SCHEMA_NAME = "dbo";
private String dbName;
private String capturedTables;
public String getDbName() {
return dbName;
}
public void setDbName(String dbName) {
this.dbName = dbName;
}
public String getCapturedTables() {
return capturedTables;
}
public void setCapturedTables(String capturedTables) {
this.capturedTables = capturedTables;
}
}
package com.tbyf.cdcengine2.sqlserver;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
public class SqlServerCdcHelper {
private static final String SWITCH_DB_SQL = "USE <dbName>";
private static final String DB_CDC_ENABLED_QUERY_SQL =
"SELECT is_cdc_enabled \n" +
"FROM sys.databases \n" +
"WHERE name = '${dbName}'";
private static final String ENABLE_DB_CDC_SQL =
"EXEC sys.sp_cdc_enable_db";
private static final String TABLE_CDC_ENABLED_QUERY_SQL =
"SELECT is_tracked_by_cdc \n" +
"FROM sys.tables \n" +
"WHERE name = '${tableName}' AND schema_id = SCHEMA_ID('dbo')";
private static final String ENABLE_TABLE_CDC_SQL =
"EXEC sys.sp_cdc_enable_table \n" +
"@source_schema = N'dbo', \n" +
"@source_name = N'${tableName}', \n" +
"@role_name = N'NULL', \n" + // TODO role_name
"@supports_net_changes = 0";
public static boolean isDBCdcEnabled(Connection conn, String dbName) {
try (Statement stmt = conn.createStatement()) {
switchDB(stmt, dbName);
String sql = DB_CDC_ENABLED_QUERY_SQL.replace("${dbName}", dbName);
try (ResultSet rs = stmt.executeQuery(sql)) {
if (rs.next()) {
return rs.getInt(1) == 1;
} else {
throw new IllegalStateException("未查询到数据库CDC开启状态");
}
}
} catch (SQLException e) {
throw new IllegalStateException("查询数据库CDC开启状态时发生了异常", e);
}
}
public static void enableDBCdcIfNeeded(Connection conn, String dbName) {
if (!isDBCdcEnabled(conn, dbName)) {
enableDBCdc(conn, dbName);
}
}
public static void enableDBCdc(Connection conn, String dbName) {
try (Statement stmt = conn.createStatement()) {
switchDB(stmt, dbName);
stmt.execute(ENABLE_DB_CDC_SQL);
} catch (SQLException e) {
throw new IllegalStateException("开启数据库CDC失败", e);
}
}
// 查询表是否开启了CDC
public static boolean isTableCdcEnabled(Connection conn, String dbName, String tableName) {
try (Statement stmt = conn.createStatement()) {
switchDB(stmt, dbName);
String sql = TABLE_CDC_ENABLED_QUERY_SQL.replace("${tableName}", tableName);
try (ResultSet rs = stmt.executeQuery(sql)) {
if (rs.next()) {
return rs.getInt(1) == 1;
} else {
throw new IllegalStateException("未查询到表CDC开启状态");
}
}
} catch (SQLException e) {
throw new IllegalStateException("查询表CDC开启状态时发生了异常", e);
}
}
public static void enableTableCdc(Connection conn, String dbName, String tableName) {
try (Statement stmt = conn.createStatement()) {
switchDB(stmt, dbName);
String sql = ENABLE_TABLE_CDC_SQL.replace("${tableName}", tableName);
stmt.execute(sql);
} catch (SQLException e) {
throw new IllegalStateException("开启表CDC失败", e);
}
}
public static void enableTableCdcIfNeeded(Connection conn, String dbName, String tableName) {
if (!isTableCdcEnabled(conn, dbName, tableName)) {
enableTableCdc(conn, dbName, tableName);
}
}
private static void switchDB(Statement stmt, String dbName) throws SQLException {
String sql = SWITCH_DB_SQL.replace("<dbName>", dbName);
stmt.execute(sql);
}
}
package com.tbyf.cdcengine2.sqlserver;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
public class SqlServerUtils {
public static final String SQL_SERVER_DRIVER_CLASS_NAME = "com.microsoft.sqlserver.jdbc.SQLServerDriver";
public static final String SQL_SERVER_URL_PATTERN = "jdbc:sqlserver://<host>:<port>";
public static void loadDriver() {
try {
Class.forName(SQL_SERVER_DRIVER_CLASS_NAME);
} catch (ClassNotFoundException e) {
throw new IllegalStateException("未找到sqlserver jdbc驱动");
}
}
public static String buildUrl(String host, String port) {
return SQL_SERVER_URL_PATTERN.replace("<host>", host)
.replace("<port>", port);
}
public static Connection getConn(String host, String port, String user, String password) {
try {
return DriverManager.getConnection(buildUrl(host, port), user, password);
} catch (SQLException e) {
throw new IllegalStateException("获取数据库连接失败", e);
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment