Commit 01e3d3a7 by y1sa

首次提交

parents
target/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**/target/
!**/src/test/**/target/
### IntelliJ IDEA ###
.idea/modules.xml
.idea/jarRepositories.xml
.idea/compiler.xml
.idea/libraries/
*.iws
*.iml
*.ipr
### Eclipse ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
build/
!**/src/main/**/build/
!**/src/test/**/build/
### VS Code ###
.vscode/
### Mac OS ###
.DS_Store
.idea/
\ No newline at end of file
source.host=192.168.0.85
source.port=1433
source.user=sa
source.password=1
source.dbname=dz_his
sync.tables=dbo.BA_BRDA(ZYH)
dest.host=192.168.0.85
dest.port=1433
dest.user=sa
dest.password=1
dest.dbname=dz_emr
\ No newline at end of file
{"source":{"server":"server1"},"position":{"commit_lsn":"000419dc:00000010:007b","snapshot":true,"snapshot_completed":false},"databaseName":"dz_his","schemaName":"dbo","tableChanges":[{"type":"CREATE","id":"\"dz_his\".\"dbo\".\"BA_BRDA\"","table":{"defaultCharsetName":null,"primaryKeyColumnNames":["ZYH"],"columns":[{"name":"ZYH","jdbcType":2,"typeName":"numeric","typeExpression":"numeric","charsetName":null,"length":18,"scale":0,"position":1,"optional":false,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":false,"enumValues":[]},{"name":"BAHM","jdbcType":12,"typeName":"varchar","typeExpression":"varchar","charsetName":null,"length":12,"position":2,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"enumValues":[]},{"name":"ZYHM","jdbcType":12,"typeName":"varchar","typeExpression":"varchar","charsetName":null,"length":12,"position":3,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"enumValues":[]},{"name":"MZHM","jdbcType":12,"typeName":"varchar","typeExpression":"varchar","charsetName":null,"length":32,"position":4,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"enumValues":[]},{"name":"FZHM","jdbcType":12,"typeName":"varchar","typeExpression":"varchar","charsetName":null,"length":12,"position":5,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"enumValues":[]},{"name":"BRXM","jdbcType":12,"typeName":"varchar","typeExpression":"varchar","charsetName":null,"length":12,"position":6,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"enumValues":[]},{"name":"BRLY","jdbcType":2,"typeName":"numeric","typeExpression":"numeric","charsetName":null,"length":4,"scale":0,"position":7,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"enumValues":[]},{"name":"RYCS","jdbcType":2,"typeName":"numeric","typeExpression":"numeric","charsetName":null,"length":4,"scale":0,"position":8,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"defaultValueExpression":"((1))","enumValues":[]},{"name":"BRXB","jdbcType":2,"typeName":"numeric","typeExpression":"numeric","charsetName":null,"length":4,"scale":0,"position":9,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"enumValues":[]},{"name":"BRXZ","jdbcType":2,"typeName":"numeric","typeExpression":"numeric","charsetName":null,"length":4,"scale":0,"position":10,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"enumValues":[]},{"name":"GZDW","jdbcType":12,"typeName":"varchar","typeExpression":"varchar","charsetName":null,"length":40,"position":11,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"enumValues":[]},{"name":"CSNY","jdbcType":93,"typeName":"datetime","typeExpression":"datetime","charsetName":null,"length":23,"scale":3,"position":12,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"enumValues":[]},{"name":"BRNL","jdbcType":2,"typeName":"numeric","typeExpression":"numeric","charsetName":null,"length":6,"scale":0,"position":13,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"enumValues":[]},{"name":"SFZH","jdbcType":12,"typeName":"varchar","typeExpression":"varchar","charsetName":null,"length":20,"position":14,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"enumValues":[]},{"name":"HYZK","jdbcType":2,"typeName":"numeric","typeExpression":"numeric","charsetName":null,"length":4,"scale":0,"position":15,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"enumValues":[]},{"name":"ZYDM","jdbcType":2,"typeName":"numeric","typeExpression":"numeric","charsetName":null,"length":4,"scale":0,"position":16,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"enumValues":[]},{"name":"SFDM","jdbcType":2,"typeName":"numeric","typeExpression":"numeric","charsetName":null,"length":4,"scale":0,"position":17,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"enumValues":[]},{"name":"JGDM","jdbcType":2,"typeName":"numeric","typeExpression":"numeric","charsetName":null,"length":4,"scale":0,"position":18,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"enumValues":[]},{"name":"MZDM","jdbcType":2,"typeName":"numeric","typeExpression":"numeric","charsetName":null,"length":4,"scale":0,"position":19,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"enumValues":[]},{"name":"GJDM","jdbcType":2,"typeName":"numeric","typeExpression":"numeric","charsetName":null,"length":4,"scale":0,"position":20,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"enumValues":[]},{"name":"DWDZ","jdbcType":12,"typeName":"varchar","typeExpression":"varchar","charsetName":null,"length":40,"position":21,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"enumValues":[]},{"name":"DWDH","jdbcType":12,"typeName":"varchar","typeExpression":"varchar","charsetName":null,"length":16,"position":22,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"enumValues":[]},{"name":"DWYB","jdbcType":12,"typeName":"varchar","typeExpression":"varchar","charsetName":null,"length":6,"position":23,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"enumValues":[]},{"name":"HKDZ","jdbcType":12,"typeName":"varchar","typeExpression":"varchar","charsetName":null,"length":40,"position":24,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"enumValues":[]},{"name":"HKDH","jdbcType":12,"typeName":"varchar","typeExpression":"varchar","charsetName":null,"length":16,"position":25,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"enumValues":[]},{"name":"HKYB","jdbcType":12,"typeName":"varchar","typeExpression":"varchar","charsetName":null,"length":6,"position":26,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"enumValues":[]},{"name":"LXRM","jdbcType":12,"typeName":"varchar","typeExpression":"varchar","charsetName":null,"length":12,"position":27,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"enumValues":[]},{"name":"LXGX","jdbcType":2,"typeName":"numeric","typeExpression":"numeric","charsetName":null,"length":4,"scale":0,"position":28,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"enumValues":[]},{"name":"LXDZ","jdbcType":12,"typeName":"varchar","typeExpression":"varchar","charsetName":null,"length":40,"position":29,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"enumValues":[]},{"name":"LXDH","jdbcType":12,"typeName":"varchar","typeExpression":"varchar","charsetName":null,"length":16,"position":30,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"enumValues":[]},{"name":"LXYB","jdbcType":12,"typeName":"varchar","typeExpression":"varchar","charsetName":null,"length":6,"position":31,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"enumValues":[]},{"name":"ZZTX","jdbcType":2,"typeName":"numeric","typeExpression":"numeric","charsetName":null,"length":4,"scale":0,"position":32,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"enumValues":[]},{"name":"DWBH","jdbcType":2,"typeName":"numeric","typeExpression":"numeric","charsetName":null,"length":6,"scale":0,"position":33,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"enumValues":[]},{"name":"SBHM","jdbcType":12,"typeName":"varchar","typeExpression":"varchar","charsetName":null,"length":10,"position":34,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"enumValues":[]},{"name":"GFZH","jdbcType":12,"typeName":"varchar","typeExpression":"varchar","charsetName":null,"length":20,"position":35,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"enumValues":[]},{"name":"SJDM","jdbcType":2,"typeName":"numeric","typeExpression":"numeric","charsetName":null,"length":4,"scale":0,"position":36,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"enumValues":[]},{"name":"XJDM","jdbcType":2,"typeName":"numeric","typeExpression":"numeric","charsetName":null,"length":4,"scale":0,"position":37,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"enumValues":[]},{"name":"GZDWJDZ","jdbcType":12,"typeName":"varchar","typeExpression":"varchar","charsetName":null,"length":200,"position":38,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"enumValues":[]},{"name":"XZZ","jdbcType":12,"typeName":"varchar","typeExpression":"varchar","charsetName":null,"length":200,"position":39,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"enumValues":[]},{"name":"ZZDH","jdbcType":12,"typeName":"varchar","typeExpression":"varchar","charsetName":null,"length":40,"position":40,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"enumValues":[]},{"name":"XZZYB","jdbcType":12,"typeName":"varchar","typeExpression":"varchar","charsetName":null,"length":6,"position":41,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"enumValues":[]},{"name":"YENL","jdbcType":2,"typeName":"numeric","typeExpression":"numeric","charsetName":null,"length":6,"scale":0,"position":42,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"enumValues":[]},{"name":"YETZ1","jdbcType":2,"typeName":"numeric","typeExpression":"numeric","charsetName":null,"length":6,"scale":0,"position":43,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"enumValues":[]},{"name":"YETZ2","jdbcType":2,"typeName":"numeric","typeExpression":"numeric","charsetName":null,"length":6,"scale":0,"position":44,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"enumValues":[]},{"name":"YETZ3","jdbcType":2,"typeName":"numeric","typeExpression":"numeric","charsetName":null,"length":6,"scale":0,"position":45,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"enumValues":[]},{"name":"YETZ4","jdbcType":2,"typeName":"numeric","typeExpression":"numeric","charsetName":null,"length":6,"scale":0,"position":46,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"enumValues":[]},{"name":"YETZ5","jdbcType":2,"typeName":"numeric","typeExpression":"numeric","charsetName":null,"length":6,"scale":0,"position":47,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"enumValues":[]},{"name":"YERYTZ","jdbcType":2,"typeName":"numeric","typeExpression":"numeric","charsetName":null,"length":6,"scale":0,"position":48,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"enumValues":[]},{"name":"CSSF","jdbcType":12,"typeName":"varchar","typeExpression":"varchar","charsetName":null,"length":30,"position":49,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"enumValues":[]},{"name":"CSDS","jdbcType":12,"typeName":"varchar","typeExpression":"varchar","charsetName":null,"length":30,"position":50,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"enumValues":[]},{"name":"CSDX","jdbcType":12,"typeName":"varchar","typeExpression":"varchar","charsetName":null,"length":30,"position":51,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"enumValues":[]},{"name":"YERYTZ1","jdbcType":2,"typeName":"numeric","typeExpression":"numeric","charsetName":null,"length":6,"scale":0,"position":52,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"enumValues":[]},{"name":"YERYTZ2","jdbcType":2,"typeName":"numeric","typeExpression":"numeric","charsetName":null,"length":6,"scale":0,"position":53,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"enumValues":[]},{"name":"YERYTZ3","jdbcType":2,"typeName":"numeric","typeExpression":"numeric","charsetName":null,"length":6,"scale":0,"position":54,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"enumValues":[]},{"name":"YERYTZ4","jdbcType":2,"typeName":"numeric","typeExpression":"numeric","charsetName":null,"length":6,"scale":0,"position":55,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"enumValues":[]}]},"comment":null}]}
2024-07-16 15:42:13 [main] ERROR c.t.d.datasync.core.DataSyncWriter - 违反了 PRIMARY KEY 约束 'PK_EMR_BL_BAGD'。不能在对象 'dbo.EMR_BL_BAGD' 中插入重复键。
java.sql.BatchUpdateException: 违反了 PRIMARY KEY 约束 'PK_EMR_BL_BAGD'。不能在对象 'dbo.EMR_BL_BAGD' 中插入重复键。
at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.executeBatch(SQLServerPreparedStatement.java:2085)
at com.alibaba.druid.pool.DruidPooledPreparedStatement.executeBatch(DruidPooledPreparedStatement.java:551)
at com.tbyd.data.datasync.core.DataSyncWriter.commitPrepareStatement(DataSyncWriter.java:226)
at com.tbyd.data.datasync.core.DataSyncWriter.handleBatch(DataSyncWriter.java:127)
at com.tbyd.data.datasync.Main.lambda$main$0(Main.java:41)
at io.debezium.embedded.ConvertingEngineBuilder.lambda$notifying$2(ConvertingEngineBuilder.java:83)
at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:822)
at io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:192)
at com.tbyd.data.datasync.Main.main(Main.java:47)
File added
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.tbyf.data</groupId>
<artifactId>sql-server-datasync</artifactId>
<version>1.0-SNAPSHOT</version>
<description>基于CDC实时同步sql server</description>
<properties>
<main.class>com.tbyd.data.datasync.Main</main.class>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<debezium.version>1.9.8.Final</debezium.version>
<mssql-jdbc.version>9.2.0.jre8</mssql-jdbc.version>
<commons-lang3.version>3.10</commons-lang3.version>
<logback.version>1.2.12</logback.version>
<lombok.version>1.18.30</lombok.version>
<druid.version>1.2.22</druid.version>
</properties>
<dependencies>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-api</artifactId>
<version>${debezium.version}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-embedded</artifactId>
<version>${debezium.version}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-sqlserver</artifactId>
<version>${debezium.version}</version>
<exclusions>
<exclusion>
<groupId>com.microsoft.sqlserver</groupId>
<artifactId>mssql-jdbc</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.microsoft.sqlserver</groupId>
<artifactId>mssql-jdbc</artifactId>
<version>${mssql-jdbc.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>${commons-lang3.version}</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>${logback.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>${druid.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.5.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>${main.class}</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
package com.tbyd.data.datasync;
import com.tbyd.data.datasync.config.DataSyncProperties;
import com.tbyd.data.datasync.config.DebeziumProperties;
import com.tbyd.data.datasync.core.AppCleaner;
import com.tbyd.data.datasync.core.DataSyncInitializer;
import com.tbyd.data.datasync.core.DataSyncWriter;
import com.tbyd.data.datasync.core.RecordHandler;
import io.debezium.embedded.Connect;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.spi.OffsetCommitPolicy;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.Properties;
@Slf4j
public class Main {
public static void main(String[] args) throws Throwable {
if (args.length == 0) {
System.err.println("请指定配置文件");
return;
}
new AppCleaner().cleanBeforeStart();
Properties props = DebeziumProperties.get();
Properties dataSyncProps = DataSyncProperties.loadAndValidate(args[0]);
DebeziumProperties.mergeDataSyncProps(props, dataSyncProps);
DataSyncInitializer initializer = new DataSyncInitializer();
initializer.initialize(props);
RecordHandler recordHandler = new DataSyncWriter(props);
DebeziumEngine<?> engine = DebeziumEngine.create(Connect.class)
.using(props)
.using(OffsetCommitPolicy.always())
.notifying((records, commiter) -> {
try {
recordHandler.handleBatch(records, commiter);
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}).build();
addShutdownHook(engine);
engine.run();
}
private static void addShutdownHook(DebeziumEngine<?> engine) {
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
engine.close();
} catch (IOException e) {
log.error(e.getMessage(), e);
}
}));
}
}
package com.tbyd.data.datasync.config;
import org.apache.commons.lang3.StringUtils;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class DataSyncProperties {
public static final String SOURCE_HOST_KEY = "source.host";
public static final String SOURCE_PORT_KEY = "source.port";
public static final String SOURCE_USER_KEY = "source.user";
public static final String SOURCE_PASSWORD_KEY = "source.password";
public static final String SOURCE_DBNAME_KEY = "source.dbname";
public static final String DEST_HOST_KEY = "dest.host";
public static final String DEST_PORT_KEY = "dest.port";
public static final String DEST_USER_KEY = "dest.user";
public static final String DEST_PASSWORD_KEY = "dest.password";
public static final String DEST_DBNAME_KEY = "dest.dbname";
/**
* 指定需要同步的表,多个表之间用逗号,分隔
* 可以为表指定唯一键,复合唯一键之间也用逗号,分隔
* eg: tb1,tb2(k1,k2),tb3(k3)
*/
public static final String SYNC_TABLES_KEY = "sync.tables";
public static final String DEFAULT_PORT = "1433";
private static final Pattern tablesWithKeyPattern = Pattern.compile("([^(,]+)(\\(([^\\)]*)\\))?");
public static Properties loadAndValidate(String location) {
Properties props = new Properties();
try (InputStream is = Files.newInputStream(Paths.get(location))) {
props.load(is);
} catch (IOException e) {
throw new IllegalStateException("加载配置文件失败", e);
}
if (StringUtils.isBlank(getSourceHost(props))) {
throw new IllegalStateException("缺失配置项:" + SOURCE_HOST_KEY);
}
if (StringUtils.isBlank(getSourceUser(props))) {
throw new IllegalStateException("缺失配置项:" + SOURCE_USER_KEY);
}
if (StringUtils.isBlank(getSourcePassword(props))) {
throw new IllegalStateException("缺失配置项:" + SOURCE_PASSWORD_KEY);
}
if (StringUtils.isBlank(getSourceDbname(props))) {
throw new IllegalStateException("缺失配置项:" + SOURCE_DBNAME_KEY);
}
if (StringUtils.isBlank(getDestinationHost(props))) {
throw new IllegalStateException("缺失配置项:" + DEST_HOST_KEY);
}
if (StringUtils.isBlank(getDestinationUser(props))) {
throw new IllegalStateException("缺失配置项:" + DEST_USER_KEY);
}
if (StringUtils.isBlank(getDestinationPassword(props))) {
throw new IllegalStateException("缺失配置项:" + DEST_PASSWORD_KEY);
}
if (StringUtils.isBlank(getDestinationDbname(props))) {
throw new IllegalStateException("缺失配置项:" + DEST_DBNAME_KEY);
}
if (StringUtils.isBlank(getSyncTableNamesWithKey(props))) {
throw new IllegalStateException("缺失配置项:" + SYNC_TABLES_KEY);
}
return props;
}
public static String getSourceHost(Properties props) {
return props.getProperty(SOURCE_HOST_KEY);
}
public static String getSourcePort(Properties props) {
return props.getProperty(SOURCE_PORT_KEY, DEFAULT_PORT);
}
public static String getSourceUser(Properties props) {
return props.getProperty(SOURCE_USER_KEY);
}
public static String getSourcePassword(Properties props) {
return props.getProperty(SOURCE_PASSWORD_KEY);
}
public static String getSourceDbname(Properties props) {
return props.getProperty(SOURCE_DBNAME_KEY);
}
public static String getDestinationHost(Properties props) {
return props.getProperty(DEST_HOST_KEY);
}
public static String getDestinationPort(Properties props) {
return props.getProperty(DEST_PORT_KEY, DEFAULT_PORT);
}
public static String getDestinationUser(Properties props) {
return props.getProperty(DEST_USER_KEY);
}
public static String getDestinationPassword(Properties props) {
return props.getProperty(DEST_PASSWORD_KEY);
}
public static String getDestinationDbname(Properties props) {
return props.getProperty(DEST_DBNAME_KEY);
}
public static String getSyncTableNamesWithKey(Properties props) {
return props.getProperty(SYNC_TABLES_KEY);
}
public static String getSyncTableNames(Properties props) {
String tablesWithKey = getSyncTableNamesWithKey(props);
List<String> tables = new ArrayList<>();
Matcher matcher = tablesWithKeyPattern.matcher(tablesWithKey);
while (matcher.find()) {
tables.add(matcher.group(1));
}
return String.join(",", tables);
}
public static String[] getSyncTableNamesArray(Properties props) {
String tablesWithKey = getSyncTableNamesWithKey(props);
List<String> tables = new ArrayList<>();
Matcher matcher = tablesWithKeyPattern.matcher(tablesWithKey);
while (matcher.find()) {
tables.add(matcher.group(1));
}
return tables.toArray(new String[0]);
}
public static List<TableAndKeys> getSyncTables(Properties props) {
String tablesWithKey = getSyncTableNamesWithKey(props);
List<TableAndKeys> result = new ArrayList<>();
Matcher matcher = tablesWithKeyPattern.matcher(tablesWithKey);
while (matcher.find()) {
String table = matcher.group(1);
String keys = matcher.group(3);
String[] keysArr = StringUtils.split(keys, ",");
result.add(new TableAndKeys(table, new HashSet<>(Arrays.asList(keysArr))));
}
return result;
}
}
package com.tbyd.data.datasync.config;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
public class DebeziumProperties {
private static final String PROPERTIES_LOCATION = "debezium.properties";
public static final String DB_PREFIX = "database.";
public static final String DB_HOST_KEY = DB_PREFIX + "hostname";
public static final String DB_PORT_KEY = DB_PREFIX + "port";
public static final String DB_USER_KEY = DB_PREFIX + "user";
public static final String DB_PASSWORD_KEY = DB_PREFIX + "password";
public static final String DB_DBNAME_KEY = DB_PREFIX + "dbname";
public static final String CAPTURED_TABLES_KEY = "table.include.list";
public static Properties get() {
ClassLoader cld = DebeziumProperties.class.getClassLoader();
try (InputStream is = cld.getResourceAsStream(PROPERTIES_LOCATION)) {
Properties props = new Properties();
props.load(is);
return props;
} catch (IOException e) {
throw new IllegalStateException(e);
}
}
public static void mergeDataSyncProps(Properties debeziumProps, Properties dataSyncProps) {
debeziumProps.putAll(dataSyncProps);
debeziumProps.setProperty(DB_HOST_KEY, DataSyncProperties.getSourceHost(dataSyncProps));
debeziumProps.setProperty(DB_PORT_KEY, DataSyncProperties.getSourcePort(dataSyncProps));
debeziumProps.setProperty(DB_USER_KEY, DataSyncProperties.getSourceUser(dataSyncProps));
debeziumProps.setProperty(DB_PASSWORD_KEY, DataSyncProperties.getSourcePassword(dataSyncProps));
debeziumProps.setProperty(DB_DBNAME_KEY, DataSyncProperties.getSourceDbname(dataSyncProps));
debeziumProps.setProperty(CAPTURED_TABLES_KEY, DataSyncProperties.getSyncTableNames(dataSyncProps));
}
}
package com.tbyd.data.datasync.config;
import lombok.ToString;
import java.util.Set;
@ToString
public class TableAndKeys {
private final String table;
/**
* 唯一键
*/
private final Set<String> keys;
public TableAndKeys(String table, Set<String> keys) {
this.table = table;
this.keys = keys;
}
public String getTable() {
return table;
}
public Set<String> getKeys() {
return keys;
}
}
package com.tbyd.data.datasync.core;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
public class AppCleaner {
public void cleanBeforeStart() {
try {
Files.delete(Paths.get("offset.dat"));
} catch (IOException ignore) {
}
try {
Files.delete(Paths.get("dbhistory.dat"));
} catch (IOException ignore) {
}
}
}
package com.tbyd.data.datasync.core;
import lombok.extern.slf4j.Slf4j;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
@Slf4j
public class CdcHelper {
private static final String DB_CDC_ENABLED_QUERY_SQL =
"SELECT is_cdc_enabled \n" +
"FROM sys.databases \n" +
"WHERE name = '${dbName}'";
private static final String ENABLE_DB_CDC_SQL =
"EXEC sys.sp_cdc_enable_db";
private static final String TABLE_CDC_ENABLED_QUERY_SQL =
"SELECT is_tracked_by_cdc \n" +
"FROM sys.tables \n" +
"WHERE name = '${tableName}' AND schema_id = SCHEMA_ID('dbo')";
private static final String ENABLE_TABLE_CDC_SQL =
"EXEC sys.sp_cdc_enable_table \n" +
"@source_schema = N'dbo', \n" +
"@source_name = N'${tableName}', \n" +
"@role_name = N'NULL', \n" + // TODO role_name
"@supports_net_changes = 0";
// 查询数据库是否开启了CDC
public static boolean isDBCdcEnabled(Connection conn, String dbName) {
try (Statement stmt = conn.createStatement()) {
DBUtils.switchDB(stmt, dbName);
String sql = DB_CDC_ENABLED_QUERY_SQL.replace("${dbName}", dbName);
try (ResultSet rs = stmt.executeQuery(sql)) {
if (rs.next()) {
return rs.getInt(1) == 1;
}
}
} catch (SQLException e) {
log.error(e.getMessage(), e);
}
throw new IllegalStateException("查询数据库[" + dbName + "]CDC开启状态失败");
}
public static void enableDBCdcIfNeeded(Connection conn, String dbName) {
if (!isDBCdcEnabled(conn, dbName)) {
enableDBCdc(conn, dbName);
}
}
public static void enableDBCdc(Connection conn, String dbName) {
try (Statement stmt = conn.createStatement()) {
DBUtils.switchDB(stmt, dbName);
stmt.execute(ENABLE_DB_CDC_SQL);
} catch (SQLException e) {
log.error(e.getMessage(), e);
throw new IllegalStateException("开启数据库[" + dbName + "]CDC失败", e);
}
}
// 查询表是否开启了CDC
public static boolean isTableCdcEnabled(Connection conn, String dbName, String tableName) {
try (Statement stmt = conn.createStatement()) {
DBUtils.switchDB(stmt, dbName);
String sql = TABLE_CDC_ENABLED_QUERY_SQL.replace("${tableName}", tableName);
try (ResultSet rs = stmt.executeQuery(sql)) {
if (rs.next()) {
return rs.getInt(1) == 1;
}
}
} catch (SQLException e) {
log.error(e.getMessage(), e);
}
throw new IllegalStateException("查询表[" + dbName + "." + tableName + "]CDC开启状态失败");
}
public static void enableTableCdc(Connection conn, String dbName, String tableName) {
try (Statement stmt = conn.createStatement()) {
DBUtils.switchDB(stmt, dbName);
String sql = ENABLE_TABLE_CDC_SQL.replace("${tableName}", tableName);
stmt.execute(sql);
} catch (SQLException e) {
log.error(e.getMessage(), e);
throw new IllegalStateException("开启表[" + dbName + "." + tableName + "]CDC失败");
}
}
public static void enableTableCdcIfNeeded(Connection conn, String dbName, String tableName) {
if (!isTableCdcEnabled(conn, dbName, tableName)) {
enableTableCdc(conn, dbName, tableName);
}
}
}
\ No newline at end of file
package com.tbyd.data.datasync.core;
import lombok.extern.slf4j.Slf4j;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
import java.util.Map;
@Slf4j
public class DBUtils {
public static final String SQL_SERVER_DRIVER_CLASS_NAME = "com.microsoft.sqlserver.jdbc.SQLServerDriver";
static {
try {
Class.forName(SQL_SERVER_DRIVER_CLASS_NAME);
} catch (ClassNotFoundException e) {
throw new IllegalStateException("未找到sql server驱动", e);
}
}
public static final String SQL_SERVER_URL_PATTERN = "jdbc:sqlserver://<host>:<port>";
private static final String SWITCH_DB_SQL = "USE <dbName>";
private static final String METADATA_QUERY_SQL = "SELECT * FROM ${tableName} WHERE 1 = 2";
public static TableMetadata getTableMetadata(Connection conn, String tableName) {
String sql = METADATA_QUERY_SQL.replace("${tableName}", tableName);
try (Statement stmt = conn.createStatement()) {
try (ResultSet rs = stmt.executeQuery(sql)) {
ResultSetMetaData rsMetaData = rs.getMetaData();
int columnCount = rsMetaData.getColumnCount();
TableMetadata tableMetadata = new TableMetadata();
tableMetadata.setTableName(tableName);
tableMetadata.setColumnCount(columnCount);
String[] columNames = new String[columnCount];
Map<String, Integer> columnIndexes = new HashMap<>();
for (int i = 0; i < columnCount; i++) {
columNames[i] = rsMetaData.getColumnName(i + 1);
columnIndexes.put(columNames[i], i);
}
tableMetadata.setColumnNames(columNames);
tableMetadata.setColumnIndexes(columnIndexes);
return tableMetadata;
}
} catch (SQLException e) {
throw new IllegalStateException(e);
}
}
public static String getSqlServerUrl(String host, String port) {
return SQL_SERVER_URL_PATTERN.replace("<host>", host)
.replace("<port>", port);
}
public static void switchDB(Statement stmt, String dbName) throws SQLException {
String sql = SWITCH_DB_SQL.replace("<dbName>", dbName);
stmt.execute(sql);
}
public static Connection getConn(String host, String port, String user, String password) throws SQLException {
return DriverManager.getConnection(getSqlServerUrl(host, port), user, password);
}
public static void closeResource(Connection conn) {
closeResource(null, null, conn);
}
public static void closeResource(ResultSet rs, Statement stmt, Connection conn) {
if (rs != null) {
try {
rs.close();
} catch (SQLException e) {
log.error(e.getMessage(), e);
}
}
if (stmt != null) {
try {
stmt.close();
} catch (SQLException e) {
log.error(e.getMessage(), e);
}
}
if (conn != null) {
try {
conn.close();
} catch (SQLException e) {
log.error(e.getMessage(), e);
}
}
}
}
package com.tbyd.data.datasync.core;
import com.tbyd.data.datasync.config.DataSyncProperties;
import lombok.extern.slf4j.Slf4j;
import java.sql.Connection;
import java.util.Properties;
@Slf4j
public class DataSyncInitializer {
private static final String DBO_SCHEMA_ID = "dbo";
public void initialize(Properties props) {
String sourceHost = DataSyncProperties.getSourceHost(props);
String sourcePort = DataSyncProperties.getSourcePort(props);
String sourceUser = DataSyncProperties.getSourceUser(props);
String sourcePassword = DataSyncProperties.getSourcePassword(props);
String sourceDbname = DataSyncProperties.getSourceDbname(props);
String[] syncTableNames = DataSyncProperties.getSyncTableNamesArray(props);
try {
Connection conn = DBUtils.getConn(sourceHost, sourcePort, sourceUser, sourcePassword);
CdcHelper.enableDBCdcIfNeeded(conn, sourceDbname);
DBUtils.closeResource(conn);
for (String tableName : syncTableNames) {
conn = DBUtils.getConn(sourceHost, sourcePort, sourceUser, sourcePassword);
CdcHelper.enableTableCdcIfNeeded(conn, sourceDbname, stripSchemaId(tableName));
DBUtils.closeResource(conn);
}
} catch (Exception e) {
log.error(e.getMessage(), e);
throw new IllegalStateException("初始化失败", e);
}
}
private static String stripSchemaId(String tableName) {
String schemaIdPrefix = DBO_SCHEMA_ID + ".";
return tableName.substring(schemaIdPrefix.length());
}
}
package com.tbyd.data.datasync.core;
import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.pool.DruidPooledConnection;
import com.tbyd.data.datasync.config.DataSyncProperties;
import com.tbyd.data.datasync.config.TableAndKeys;
import io.debezium.data.Envelope;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@Slf4j
public class DataSyncWriter implements RecordHandler {
private final DruidDataSource dataSource;
private static final String JDBC_PATTERN = "jdbc:sqlserver://<server_name>:<port>;databaseName=<database_name>";
private final Properties props;
private final Map<String, TableMetadata> metadataCache = new HashMap<>();
public DataSyncWriter(Properties props) {
String host = DataSyncProperties.getDestinationHost(props);
String port = DataSyncProperties.getDestinationPort(props);
String user = DataSyncProperties.getDestinationUser(props);
String password = DataSyncProperties.getDestinationPassword(props);
String dbname = DataSyncProperties.getDestinationDbname(props);
String url = buildUrl(host, port, dbname);
dataSource = new DruidDataSource();
dataSource.setUrl(url);
dataSource.setUsername(user);
dataSource.setPassword(password);
this.props = props;
init();
}
private void init() {
for (TableAndKeys tableAndKeys : DataSyncProperties.getSyncTables(props)) {
try (DruidPooledConnection conn = dataSource.getConnection()) {
String tableName = tableAndKeys.getTable();
TableMetadata tableMetadata = DBUtils.getTableMetadata(conn, tableName);
tableMetadata.setKeys(tableAndKeys.getKeys());
metadataCache.put(tableName, tableMetadata);
} catch (SQLException e) {
throw new IllegalStateException(e);
}
}
}
private static String buildUrl(String host, String port, String dbName) {
return JDBC_PATTERN.replace("<server_name>", host)
.replace("<port>", port)
.replace("<database_name>", dbName);
}
@Override
public boolean supports(ChangeEvent<SourceRecord, SourceRecord> record) {
SourceRecord r = record.value();
Struct value = (Struct) r.value();
if (value == null) {
return false;
}
Schema valueSchema = r.valueSchema();
if (valueSchema.name().equals("io.debezium.connector.sqlserver.SchemaChangeValue")) {
return false;
}
return true;
}
@Override
public void handle(ChangeEvent<SourceRecord, SourceRecord> record) {
throw new UnsupportedOperationException();
}
@Override
public void handleBatch(List<ChangeEvent<SourceRecord, SourceRecord>> records, DebeziumEngine.RecordCommitter<ChangeEvent<SourceRecord, SourceRecord>> committer) throws Exception {
RecordChangeEvent prevEvent = null;
int maxBatchSize = 1500;
int cnt = 0;
for (ChangeEvent<SourceRecord, SourceRecord> record : records) {
if (supports(record)) {
cnt++;
if (cnt >= maxBatchSize) {
cnt = 0;
if (prevEvent != null) {
commitPrepareStatement((PreparedStatement) prevEvent.attachment);
}
prevEvent = null;
}
String tableName = record.destination().substring(
record.destination().indexOf(".") + 1);
Struct value = (Struct) record.value().value();
Schema valueSchema = record.value().valueSchema();
Struct before = (Struct) value.get(valueSchema.field(Envelope.FieldName.BEFORE));
Struct after = (Struct) value.get(valueSchema.field(Envelope.FieldName.AFTER));
String opCode = (String) value.get(valueSchema.field(Envelope.FieldName.OPERATION));
Envelope.Operation op = Envelope.Operation.forCode(opCode);
RecordChangeEvent curEvent = new RecordChangeEvent(tableName, op,
resolveColumns(before),
resolveColumns(after));
try {
doHandle(curEvent, prevEvent);
} catch (Exception e) {
log.error("处理记录变更时发生错误", e);
log.error("table: {}, before: {}, after: {}", curEvent.tableName, curEvent.before, curEvent.after);
}
if (prevEvent != null) {
prevEvent.attachment = null;
}
prevEvent = curEvent;
}
committer.markProcessed(record);
}
if (prevEvent != null) {
commitPrepareStatement((PreparedStatement) prevEvent.attachment);
}
committer.markBatchFinished();
}
static class RecordChangeEvent {
String tableName;
Envelope.Operation op;
Object[] before;
Object[] after;
Object attachment;
public RecordChangeEvent(String tableName, Envelope.Operation op, Object[] before, Object[] after) {
this.tableName = tableName;
this.op = op;
this.before = before;
this.after = after;
}
}
private void doHandle(RecordChangeEvent curEvent, RecordChangeEvent prevEvent) throws SQLException {
switch (curEvent.op) {
case CREATE:
doHandleCreate(curEvent, prevEvent);
return;
case UPDATE:
doHandleUpdate(curEvent, prevEvent);
return;
case DELETE:
doHandleDelete(curEvent, prevEvent);
return;
}
}
private void doHandleDelete(RecordChangeEvent curEvent, RecordChangeEvent prevEvent) throws SQLException {
PreparedStatement stmt;
if (determineUseSamePreparedStatement(curEvent, prevEvent)) {
stmt = (PreparedStatement) prevEvent.attachment;
} else {
if (prevEvent != null) {
commitPrepareStatement((PreparedStatement) prevEvent.attachment);
}
stmt = getConn().prepareStatement(buildDeleteSql(metadataCache.get(curEvent.tableName)));
}
curEvent.attachment = stmt;
TableMetadata tableMetadata = metadataCache.get(curEvent.tableName);
String[] keys = tableMetadata.getKeysArray();
for (int i = 0; i < keys.length; i++) {
stmt.setObject(i + 1, curEvent.before[tableMetadata.getIndex(keys[i])]);
}
stmt.addBatch();
}
private void doHandleUpdate(RecordChangeEvent curEvent, RecordChangeEvent prevEvent) throws SQLException {
PreparedStatement stmt;
if (determineUseSamePreparedStatement(curEvent, prevEvent)) {
stmt = (PreparedStatement) prevEvent.attachment;
} else {
if (prevEvent != null) {
commitPrepareStatement((PreparedStatement) prevEvent.attachment);
}
stmt = getConn().prepareStatement(buildUpdateSql(metadataCache.get(curEvent.tableName)));
}
curEvent.attachment = stmt;
for (int i = 0; i < curEvent.after.length; i++) {
stmt.setObject(i + 1, curEvent.after[i]);
}
TableMetadata tableMetadata = metadataCache.get(curEvent.tableName);
String[] keys = tableMetadata.getKeysArray();
for (int i = 0; i < keys.length; i++) {
stmt.setObject(curEvent.after.length + i + 1, curEvent.before[tableMetadata.getIndex(keys[i])]);
}
stmt.addBatch();
}
private void doHandleCreate(RecordChangeEvent curEvent, RecordChangeEvent prevEvent) throws SQLException {
PreparedStatement stmt;
if (determineUseSamePreparedStatement(curEvent, prevEvent)) {
stmt = (PreparedStatement) prevEvent.attachment;
} else {
if (prevEvent != null) {
commitPrepareStatement((PreparedStatement) prevEvent.attachment);
}
stmt = getConn().prepareStatement(buildInsertSql(metadataCache.get(curEvent.tableName)));
}
curEvent.attachment = stmt;
for (int i = 0; i < curEvent.after.length; i++) {
stmt.setObject(i + 1, curEvent.after[i]);
}
stmt.addBatch();
}
private Connection getConn() throws SQLException {
Connection conn = dataSource.getConnection();
conn.setAutoCommit(false);
return conn;
}
private static void commitPrepareStatement(PreparedStatement stmt) {
Connection conn = null;
try {
if (stmt != null) {
conn = stmt.getConnection();
stmt.executeBatch();
conn.commit();
}
} catch (SQLException e) {
log.error(e.getMessage(), e);
try {
conn.rollback();
} catch (SQLException ex) {
}
} finally {
DBUtils.closeResource(null, stmt, conn);
}
}
/**
* 是否使用同一个PreparedStatement
*/
private boolean determineUseSamePreparedStatement(RecordChangeEvent curEvent, RecordChangeEvent prevEvent) {
if (prevEvent == null) {
return false;
}
return curEvent.tableName.equals(prevEvent.tableName) && curEvent.op == prevEvent.op;
}
private static Object[] resolveColumns(Struct colsStruct) {
if (colsStruct == null) {
return null;
}
Schema schema = colsStruct.schema();
Object[] cols = new Object[schema.fields().size()];
for (int i = 0; i < schema.fields().size(); i++) {
Field field = schema.fields().get(i);
Object value = colsStruct.get(field);
value = convert(value, field.schema());
cols[i] = value;
}
return cols;
}
private static Object convert(Object value, Schema fieldSchema) {
if (value == null) {
return null;
}
if (fieldSchema.name() == null) {
if (fieldSchema.type() == Schema.Type.STRING) {
return value;
}
if (fieldSchema.type() == Schema.Type.INT32) {
return value;
}
if (fieldSchema.type() == Schema.Type.BYTES) {
return value;
}
}
switch (fieldSchema.name()) {
case "org.apache.kafka.connect.data.Decimal":
return value;
case "io.debezium.time.Timestamp":
return new Timestamp((Long) value);
default:
throw new IllegalStateException("暂不支持这种数据类型的字段:" + fieldSchema.name());
}
}
private static String buildInsertSql(TableMetadata tableMetadata) {
StringBuilder sql = new StringBuilder();
sql.append("INSERT INTO ");
sql.append(tableMetadata.getTableName());
sql.append("(");
sql.append(String.join(",", tableMetadata.getColumnNames()));
sql.append(") VALUES (");
for (int i = 0; i < tableMetadata.getColumnCount(); i++) {
sql.append("?");
if (i < tableMetadata.getColumnCount() - 1) {
sql.append(",");
}
}
sql.append(")");
return sql.toString();
}
private static String buildUpdateSql(TableMetadata tableMetadata) {
StringBuilder sql = new StringBuilder();
sql.append("UPDATE ");
sql.append(tableMetadata.getTableName());
sql.append(" SET ");
for (int i = 0; i < tableMetadata.getColumnCount(); i++) {
sql.append(tableMetadata.getColumnNames()[i]);
sql.append(" = ");
sql.append("?");
if (i < tableMetadata.getColumnCount() - 1) {
sql.append(", ");
}
}
sql.append(" WHERE ");
String[] keys = tableMetadata.getKeysArray();
for (int i = 0; i < keys.length; i++) {
sql.append(keys[i]);
sql.append(" = ");
sql.append("?");
if (i < keys.length - 1) {
sql.append(" AND ");
}
}
return sql.toString();
}
private static String buildDeleteSql(TableMetadata tableMetadata) {
StringBuilder sql = new StringBuilder();
sql.append("DELETE FROM ");
sql.append(tableMetadata.getTableName());
sql.append(" WHERE ");
String[] keys = tableMetadata.getKeysArray();
for (int i = 0; i < keys.length; i++) {
sql.append(keys[i]);
sql.append(" = ");
sql.append("?");
if (i < keys.length - 1) {
sql.append(" AND ");
}
}
return sql.toString();
}
}
package com.tbyd.data.datasync.core;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import org.apache.kafka.connect.source.SourceRecord;
import java.util.List;
public interface RecordHandler {
boolean supports(ChangeEvent<SourceRecord, SourceRecord> record);
void handle(ChangeEvent<SourceRecord, SourceRecord> record);
default void handleBatch(List<ChangeEvent<SourceRecord, SourceRecord>> records, DebeziumEngine.RecordCommitter<ChangeEvent<SourceRecord, SourceRecord>> committer) throws Exception {
for (ChangeEvent<SourceRecord, SourceRecord> record : records) {
committer.markProcessed(record);
if (supports(record)) {
handle(record);
}
}
committer.markBatchFinished();
}
}
package com.tbyd.data.datasync.core;
import lombok.Data;
import java.util.Map;
import java.util.Set;
@Data
public class TableMetadata {
private String tableName;
private int columnCount;
private String[] columnNames;
private Map<String, Integer> columnIndexes;
private Set<String> keys;
public Integer getIndex(String column) {
return columnIndexes.get(column);
}
public String[] getKeysArray() {
return keys.toArray(new String[0]);
}
}
name=engine
connector.class=io.debezium.connector.sqlserver.SqlServerConnector
database.history=io.debezium.relational.history.FileDatabaseHistory
snapshot.mode=schema_only
database.server.name=server1
offset.storage.file.filename=offset.dat
database.history.file.filename=dbhistory.dat
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<!-- encoders are assigned the type
ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<!-- 定义文件Appender -->
<appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<!-- 日志文件路径 -->
<file>logs/error.log</file>
<!-- 滚动策略 -->
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<!-- 文件名模式 -->
<fileNamePattern>logs/error.%d{yyyy-MM-dd}.log</fileNamePattern>
<!-- 保留的最大历史日志文件数 -->
<maxHistory>30</maxHistory>
</rollingPolicy>
<!-- 日志输出格式 -->
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<!-- 配置Logger,将ERROR级别日志输出到文件 -->
<logger name="com.tbyd.data.datasync" level="ERROR" additivity="false">
<appender-ref ref="FILE" />
</logger>
<!-- 配置Root Logger(可选) -->
<root level="INFO">
<appender-ref ref="STDOUT" />
</root>
</configuration>
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment