Commit bfe44eee by y1sa

首次提交

parents
target/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**/target/
!**/src/test/**/target/
### IntelliJ IDEA ###
.idea/modules.xml
.idea/jarRepositories.xml
.idea/compiler.xml
.idea/libraries/
*.iws
*.iml
*.ipr
### Eclipse ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
build/
!**/src/main/**/build/
!**/src/test/**/build/
### VS Code ###
.vscode/
### Mac OS ###
.DS_Store
target/
\ No newline at end of file
# Default ignored files
/shelf/
/workspace.xml
# Editor-based HTTP Client requests
/httpRequests/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Encoding">
<file url="file://$PROJECT_DIR$/common/src/main/java" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/common/src/main/resources" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/core/src/main/java" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/core/src/main/resources" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/mysql-cdc/src/main/java" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/mysql-cdc/src/main/resources" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/oracle-cdc/src/main/java" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/oracle-cdc/src/main/resources" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/sqlserver-cdc/src/main/java" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/sqlserver-cdc/src/main/resources" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/src/main/java" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/src/main/resources" charset="UTF-8" />
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ExternalStorageConfigurationManager" enabled="true" />
<component name="MavenProjectsManager">
<option name="originalFiles">
<list>
<option value="$PROJECT_DIR$/pom.xml" />
</list>
</option>
<option name="ignoredFiles">
<set>
<option value="$PROJECT_DIR$/common/pom.xml" />
</set>
</option>
</component>
<component name="ProjectInspectionProfilesVisibleTreeState">
<entry key="Project Default">
<profile-state>
<expanded-state>
<State />
<State>
<id>CodePlugin DevKit</id>
</State>
<State>
<id>GeneralJavaScript and TypeScript</id>
</State>
<State>
<id>JavaScript and TypeScript</id>
</State>
<State>
<id>Kotlin</id>
</State>
<State>
<id>Plugin DevKit</id>
</State>
<State>
<id>Plugin descriptorPlugin DevKit</id>
</State>
<State>
<id>Style issuesKotlin</id>
</State>
</expanded-state>
<selected-state>
<State>
<id>Angular</id>
</State>
</selected-state>
</profile-state>
</entry>
</component>
<component name="ProjectRootManager" version="2" languageLevel="JDK_1_8" default="true" project-jdk-name="1.8" project-jdk-type="JavaSDK">
<output url="file://$PROJECT_DIR$/out" />
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Palette2">
<group name="Swing">
<item class="com.intellij.uiDesigner.HSpacer" tooltip-text="Horizontal Spacer" icon="/com/intellij/uiDesigner/icons/hspacer.svg" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="1" hsize-policy="6" anchor="0" fill="1" />
</item>
<item class="com.intellij.uiDesigner.VSpacer" tooltip-text="Vertical Spacer" icon="/com/intellij/uiDesigner/icons/vspacer.svg" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="6" hsize-policy="1" anchor="0" fill="2" />
</item>
<item class="javax.swing.JPanel" icon="/com/intellij/uiDesigner/icons/panel.svg" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="3" hsize-policy="3" anchor="0" fill="3" />
</item>
<item class="javax.swing.JScrollPane" icon="/com/intellij/uiDesigner/icons/scrollPane.svg" removable="false" auto-create-binding="false" can-attach-label="true">
<default-constraints vsize-policy="7" hsize-policy="7" anchor="0" fill="3" />
</item>
<item class="javax.swing.JButton" icon="/com/intellij/uiDesigner/icons/button.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="3" anchor="0" fill="1" />
<initial-values>
<property name="text" value="Button" />
</initial-values>
</item>
<item class="javax.swing.JRadioButton" icon="/com/intellij/uiDesigner/icons/radioButton.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="3" anchor="8" fill="0" />
<initial-values>
<property name="text" value="RadioButton" />
</initial-values>
</item>
<item class="javax.swing.JCheckBox" icon="/com/intellij/uiDesigner/icons/checkBox.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="3" anchor="8" fill="0" />
<initial-values>
<property name="text" value="CheckBox" />
</initial-values>
</item>
<item class="javax.swing.JLabel" icon="/com/intellij/uiDesigner/icons/label.svg" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="0" anchor="8" fill="0" />
<initial-values>
<property name="text" value="Label" />
</initial-values>
</item>
<item class="javax.swing.JTextField" icon="/com/intellij/uiDesigner/icons/textField.svg" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="8" fill="1">
<preferred-size width="150" height="-1" />
</default-constraints>
</item>
<item class="javax.swing.JPasswordField" icon="/com/intellij/uiDesigner/icons/passwordField.svg" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="8" fill="1">
<preferred-size width="150" height="-1" />
</default-constraints>
</item>
<item class="javax.swing.JFormattedTextField" icon="/com/intellij/uiDesigner/icons/formattedTextField.svg" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="8" fill="1">
<preferred-size width="150" height="-1" />
</default-constraints>
</item>
<item class="javax.swing.JTextArea" icon="/com/intellij/uiDesigner/icons/textArea.svg" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3">
<preferred-size width="150" height="50" />
</default-constraints>
</item>
<item class="javax.swing.JTextPane" icon="/com/intellij/uiDesigner/icons/textPane.svg" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3">
<preferred-size width="150" height="50" />
</default-constraints>
</item>
<item class="javax.swing.JEditorPane" icon="/com/intellij/uiDesigner/icons/editorPane.svg" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3">
<preferred-size width="150" height="50" />
</default-constraints>
</item>
<item class="javax.swing.JComboBox" icon="/com/intellij/uiDesigner/icons/comboBox.svg" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="0" hsize-policy="2" anchor="8" fill="1" />
</item>
<item class="javax.swing.JTable" icon="/com/intellij/uiDesigner/icons/table.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3">
<preferred-size width="150" height="50" />
</default-constraints>
</item>
<item class="javax.swing.JList" icon="/com/intellij/uiDesigner/icons/list.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="6" hsize-policy="2" anchor="0" fill="3">
<preferred-size width="150" height="50" />
</default-constraints>
</item>
<item class="javax.swing.JTree" icon="/com/intellij/uiDesigner/icons/tree.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3">
<preferred-size width="150" height="50" />
</default-constraints>
</item>
<item class="javax.swing.JTabbedPane" icon="/com/intellij/uiDesigner/icons/tabbedPane.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="3" hsize-policy="3" anchor="0" fill="3">
<preferred-size width="200" height="200" />
</default-constraints>
</item>
<item class="javax.swing.JSplitPane" icon="/com/intellij/uiDesigner/icons/splitPane.svg" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="3" hsize-policy="3" anchor="0" fill="3">
<preferred-size width="200" height="200" />
</default-constraints>
</item>
<item class="javax.swing.JSpinner" icon="/com/intellij/uiDesigner/icons/spinner.svg" removable="false" auto-create-binding="true" can-attach-label="true">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="8" fill="1" />
</item>
<item class="javax.swing.JSlider" icon="/com/intellij/uiDesigner/icons/slider.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="8" fill="1" />
</item>
<item class="javax.swing.JSeparator" icon="/com/intellij/uiDesigner/icons/separator.svg" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="6" hsize-policy="6" anchor="0" fill="3" />
</item>
<item class="javax.swing.JProgressBar" icon="/com/intellij/uiDesigner/icons/progressbar.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="0" fill="1" />
</item>
<item class="javax.swing.JToolBar" icon="/com/intellij/uiDesigner/icons/toolbar.svg" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="6" anchor="0" fill="1">
<preferred-size width="-1" height="20" />
</default-constraints>
</item>
<item class="javax.swing.JToolBar$Separator" icon="/com/intellij/uiDesigner/icons/toolbarSeparator.svg" removable="false" auto-create-binding="false" can-attach-label="false">
<default-constraints vsize-policy="0" hsize-policy="0" anchor="0" fill="1" />
</item>
<item class="javax.swing.JScrollBar" icon="/com/intellij/uiDesigner/icons/scrollbar.svg" removable="false" auto-create-binding="true" can-attach-label="false">
<default-constraints vsize-policy="6" hsize-policy="0" anchor="0" fill="2" />
</item>
</group>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.tbyf.cdcengine2</groupId>
<artifactId>cdc-engine2</artifactId>
<version>1.0.0</version>
</parent>
<artifactId>core</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-api</artifactId>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-embedded</artifactId>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package com.tbyf.cdcengine2.core;
import io.debezium.embedded.Connect;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.spi.OffsetCommitPolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public abstract class AbstractCdcEngine<T extends AbstractCdcEngine<T>> {
private final Logger logger = LoggerFactory.getLogger(getClass());
private final Object startupStopMonitor = new Object();
private volatile DebeziumEngine<?> debeziumEngine;
private volatile ExecutorService executor;
protected final Properties debeziumProps = new Properties();
private final BlockingQueue<ChangedRecord> recordQueue = new ArrayBlockingQueue<>(10000);
protected ChangeEventAdapter adapter() {
return DefaultChangeEventAdapter.INSTANCE;
}
private volatile Thread consumerThread;
private ChangeHandler handler;
private volatile boolean running = true;
public interface ChangeHandler {
void process(ChangedRecord record) throws Exception;
}
@SuppressWarnings("unchecked")
public T changeHandler(ChangeHandler handler) {
this.handler = handler;
return (T) this;
}
public void start() {
synchronized (startupStopMonitor) {
ChangeHandler handler0 = this.handler;
if (handler0 == null) {
throw new IllegalStateException("recordConsumer not set");
}
consumerThread = new Thread(() -> {
while (running) {
try {
List<ChangedRecord> buffer = new ArrayList<>();
recordQueue.drainTo(buffer);
if (buffer.isEmpty()) {
ChangedRecord record = recordQueue.take();
try {
handler0.process(record);
} catch (Exception e) {
logger.error("处理{}时发生了异常", record, e);
}
} else {
for (ChangedRecord record : buffer) {
try {
handler0.process(record);
} catch (Exception e) {
logger.error("处理{}时发生了异常", record, e);
}
}
}
} catch (InterruptedException e) {
}
}
// 消费完剩余的记录
List<ChangedRecord> buffer = new ArrayList<>();
recordQueue.drainTo(buffer);
if (!buffer.isEmpty()) {
for (ChangedRecord record : buffer) {
try {
handler0.process(record);
} catch (Exception e) {
logger.error("处理{}时发生了异常", record, e);
}
}
}
}, "cdc-consumer");
consumerThread.start();
debeziumEngine = DebeziumEngine.create(Connect.class)
.using(OffsetCommitPolicy.always())
.using(debeziumProps)
.using(((success, message, error) -> {
if (!success) {
logger.error("启动失败, 原因: {}", message, error);
stop();
}
}))
.notifying(event -> {
try {
ChangedRecord changedRecord = adapter().adapt(event);
if (changedRecord != null) {
recordQueue.put(changedRecord);
}
} catch (InterruptedException ignored) {
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}).build();
executor = Executors.newSingleThreadExecutor();
executor.execute(debeziumEngine);
}
}
public void stop() {
synchronized (startupStopMonitor) {
running = false;
DebeziumEngine<?> debeziumEngine = this.debeziumEngine;
if (debeziumEngine != null) {
try {
debeziumEngine.close();
} catch (IOException e) {
logger.error("关闭时发生了错误", e);
}
}
this.debeziumEngine = null;
ExecutorService executor = this.executor;
if (executor != null) {
executor.shutdown();
}
this.executor = null;
Thread consumerThread = this.consumerThread;
if (consumerThread != null && consumerThread.isAlive()) {
consumerThread.interrupt();
}
this.consumerThread = null;
}
}
}
package com.tbyf.cdcengine2.core;
import io.debezium.engine.ChangeEvent;
import org.apache.kafka.connect.source.SourceRecord;
/**
* ChangeEvent -> ChangedRecord
*/
public interface ChangeEventAdapter {
/**
* 如果返回null, 表示不处理这个事件
*
* @param event
* @return
*/
ChangedRecord adapt(ChangeEvent<SourceRecord, SourceRecord> event) throws Exception;
}
package com.tbyf.cdcengine2.core;
import java.util.Map;
import java.util.Objects;
public class ChangedRecord {
// DATACENTER.ADAPTER_CDC_COLUMN, 其中DATACENTER是schema, ADAPTER_CDC_COLUMN是table
private String schema;
private String table;
private Map<String, Object> before;
private Map<String, Object> now;
private OperationType operationType;
public String table() {
return table;
}
public ChangedRecord table(String table) {
this.table = table;
return this;
}
public ChangedRecord schema(String schema) {
this.schema = schema;
return this;
}
public String schema() {
return schema;
}
public boolean hasChanged() {
if (before == null || now == null) {
return true;
}
for (Map.Entry<String, Object> entry : before.entrySet()) {
String key = entry.getKey();
Object beforeValue = entry.getValue();
Object nowValue = now.get(key);
if (!Objects.equals(beforeValue, nowValue)) {
return true;
}
}
return false;
}
public ChangedRecord before(Map<String, Object> before) {
this.before = before;
return this;
}
public ChangedRecord now(Map<String, Object> now) {
this.now = now;
return this;
}
public Map<String, Object> before() {
return before;
}
public Map<String, Object> now() {
return now;
}
public OperationType operationType() {
return operationType;
}
public ChangedRecord operationType(OperationType operationType) {
this.operationType = operationType;
return this;
}
@Override
public String toString() {
return "ChangedRecord{" +
"before=" + before +
", now=" + now +
", operationType=" + operationType +
'}';
}
}
package com.tbyf.cdcengine2.core;
import io.debezium.data.Envelope;
import io.debezium.data.VariableScaleDecimal;
import io.debezium.engine.ChangeEvent;
import io.debezium.time.Timestamp;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.HashMap;
import java.util.Map;
public class DefaultChangeEventAdapter implements ChangeEventAdapter {
public static final DefaultChangeEventAdapter INSTANCE = new DefaultChangeEventAdapter();
@Override
public ChangedRecord adapt(ChangeEvent<SourceRecord, SourceRecord> event) throws Exception {
SourceRecord sr = event.value();
Struct value = (Struct) sr.value();
if (value == null) {
return null;
}
Schema valueSchema = sr.valueSchema();
if (!valueSchema.name().endsWith(".Envelope")) {
return null;
}
ChangedRecord record = new ChangedRecord();
String destination = event.destination();
String schemaTable = destination.substring(destination.indexOf(".") + 1);
String[] split = schemaTable.split("\\.");
record.schema(split[0])
.table(split[1]);
Struct before = value.getStruct(valueSchema.field(Envelope.FieldName.BEFORE).name());
Struct now = value.getStruct(valueSchema.field(Envelope.FieldName.AFTER).name());
String op = value.getString(valueSchema.field(Envelope.FieldName.OPERATION).name());
Envelope.Operation operation = Envelope.Operation.forCode(op);
if (operation == Envelope.Operation.CREATE) {
record.operationType(OperationType.CREATE);
} else if (operation == Envelope.Operation.UPDATE) {
record.operationType(OperationType.UPDATE);
} else if (operation == Envelope.Operation.DELETE) {
record.operationType(OperationType.DELETE);
} else {
return null;
}
record.before(resolveColumns(before))
.now(resolveColumns(now));
return record;
}
private Map<String, Object> resolveColumns(Struct cols) {
if (cols == null) {
return null;
}
HashMap<String, Object> result = new HashMap<>();
Schema schema = cols.schema();
for (Field field : schema.fields()) {
Object value = cols.get(field);
if (value != null) {
if (field.schema().type() == Schema.Type.STRUCT) {
Struct structValue = (Struct) value;
// 处理小数类型
if (field.schema().name().equals(VariableScaleDecimal.class.getName())) {
value = new BigDecimal(new BigInteger(structValue.getBytes(VariableScaleDecimal.VALUE_FIELD)),
structValue.getInt32(VariableScaleDecimal.SCALE_FIELD));
}
}
if (field.schema().type() == Schema.Type.INT64) {
Long longValue = (Long) value;
// 处理时间类型
if (field.schema().name().equals(Timestamp.class.getName())) {
value = LocalDateTime.ofInstant(Instant.ofEpochMilli(longValue), ZoneOffset.UTC);
}
}
// todo 考虑其他情况
}
result.put(field.name(), value);
}
return result;
}
}
package com.tbyf.cdcengine2.core;
/**
* 操作类型
*/
public enum OperationType {
CREATE,
UPDATE,
DELETE
}
{"source":{"server":"server1"},"position":{"snapshot_scn":"2425144322","snapshot":true,"scn":"2425144322","snapshot_completed":false},"databaseName":"ORCL","schemaName":"DATACENTER","ddl":"\n CREATE TABLE \"DATACENTER\".\"ADAPTER_CDC_COLUMN\" \n (\t\"ID\" NUMBER NOT NULL ENABLE, \n\t\"DATASET_CODE\" VARCHAR2(255), \n\t\"METADATA_CODE\" VARCHAR2(255), \n\t\"CONVERT_MODE\" CHAR(1) DEFAULT '4', \n\t\"SOURCE_FIELD\" VARCHAR2(255), \n\t\"CONVERT_VALUE\" VARCHAR2(255) DEFAULT '', \n\t SUPPLEMENTAL LOG DATA (ALL) COLUMNS\n ) ;","tableChanges":[{"type":"CREATE","id":"\"ORCL\".\"DATACENTER\".\"ADAPTER_CDC_COLUMN\"","table":{"defaultCharsetName":null,"primaryKeyColumnNames":[],"columns":[{"name":"ID","jdbcType":2,"typeName":"NUMBER","typeExpression":"NUMBER","charsetName":null,"length":0,"position":1,"optional":false,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":false,"enumValues":[]},{"name":"DATASET_CODE","jdbcType":12,"typeName":"VARCHAR2","typeExpression":"VARCHAR2","charsetName":null,"length":255,"position":2,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"enumValues":[]},{"name":"METADATA_CODE","jdbcType":12,"typeName":"VARCHAR2","typeExpression":"VARCHAR2","charsetName":null,"length":255,"position":3,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"enumValues":[]},{"name":"CONVERT_MODE","jdbcType":1,"typeName":"CHAR","typeExpression":"CHAR","charsetName":null,"length":1,"position":4,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"defaultValueExpression":"'4'","enumValues":[]},{"name":"SOURCE_FIELD","jdbcType":12,"typeName":"VARCHAR2","typeExpression":"VARCHAR2","charsetName":null,"length":255,"position":5,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"enumValues":[]},{"name":"CONVERT_VALUE","jdbcType":12,"typeName":"VARCHAR2","typeExpression":"VARCHAR2","charsetName":null,"length":255,"position":6,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"defaultValueExpression":"''\r\n","enumValues":[]}]},"comment":null}]}
{"source":{"server":"server1"},"position":{"snapshot_scn":"2425144524","snapshot":true,"scn":"2425144524","snapshot_completed":false},"databaseName":"ORCL","schemaName":"DATACENTER","ddl":"\n CREATE TABLE \"DATACENTER\".\"ADAPTER_CDC_COLUMN\" \n (\t\"ID\" NUMBER NOT NULL ENABLE, \n\t\"DATASET_CODE\" VARCHAR2(255), \n\t\"METADATA_CODE\" VARCHAR2(255), \n\t\"CONVERT_MODE\" CHAR(1) DEFAULT '4', \n\t\"SOURCE_FIELD\" VARCHAR2(255), \n\t\"CONVERT_VALUE\" VARCHAR2(255) DEFAULT '', \n\t SUPPLEMENTAL LOG DATA (ALL) COLUMNS\n ) ;","tableChanges":[{"type":"CREATE","id":"\"ORCL\".\"DATACENTER\".\"ADAPTER_CDC_COLUMN\"","table":{"defaultCharsetName":null,"primaryKeyColumnNames":[],"columns":[{"name":"ID","jdbcType":2,"typeName":"NUMBER","typeExpression":"NUMBER","charsetName":null,"length":0,"position":1,"optional":false,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":false,"enumValues":[]},{"name":"DATASET_CODE","jdbcType":12,"typeName":"VARCHAR2","typeExpression":"VARCHAR2","charsetName":null,"length":255,"position":2,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"enumValues":[]},{"name":"METADATA_CODE","jdbcType":12,"typeName":"VARCHAR2","typeExpression":"VARCHAR2","charsetName":null,"length":255,"position":3,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"enumValues":[]},{"name":"CONVERT_MODE","jdbcType":1,"typeName":"CHAR","typeExpression":"CHAR","charsetName":null,"length":1,"position":4,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"defaultValueExpression":"'4'","enumValues":[]},{"name":"SOURCE_FIELD","jdbcType":12,"typeName":"VARCHAR2","typeExpression":"VARCHAR2","charsetName":null,"length":255,"position":5,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"enumValues":[]},{"name":"CONVERT_VALUE","jdbcType":12,"typeName":"VARCHAR2","typeExpression":"VARCHAR2","charsetName":null,"length":255,"position":6,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"defaultValueExpression":"''\r\n","enumValues":[]}]},"comment":null}]}
File added
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.tbyf.cdcengine2</groupId>
<artifactId>cdc-engine2</artifactId>
<version>1.0.0</version>
</parent>
<artifactId>mysql-cdc</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.tbyf.cdcengine2</groupId>
<artifactId>cdc-engine2</artifactId>
<version>1.0.0</version>
</parent>
<artifactId>oracle-cdc</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>com.tbyf.cdcengine2</groupId>
<artifactId>core</artifactId>
</dependency>
<dependency>
<groupId>com.oracle.database.jdbc</groupId>
<artifactId>ojdbc8</artifactId>
</dependency>
<dependency>
<groupId>com.oracle.database.nls</groupId>
<artifactId>orai18n</artifactId>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-oracle</artifactId>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package com.tbyf.cdcengine2.oracle;
import com.tbyf.cdcengine2.core.AbstractCdcEngine;
import com.tbyf.cdcengine2.core.ChangeEventAdapter;
import com.tbyf.cdcengine2.core.DefaultChangeEventAdapter;
import io.debezium.connector.oracle.OracleConnector;
import io.debezium.relational.history.FileDatabaseHistory;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.stream.Collectors;
public class OracleCdcEngine extends AbstractCdcEngine<OracleCdcEngine> {
static {
try {
Class.forName("oracle.jdbc.driver.OracleDriver");
} catch (ClassNotFoundException e) {
throw new IllegalStateException("未找到Oracle jdbc驱动", e);
}
}
public OracleCdcEngine(OracleCdcEngineProperties props) {
debeziumProps.setProperty("connector.class", OracleConnector.class.getName());
debeziumProps.setProperty("database.history", FileDatabaseHistory.class.getName());
debeziumProps.setProperty("database.history.file.filename", props.namespace() + "_his.dat");
debeziumProps.setProperty("offset.storage.file.filename", props.namespace() + "_off.dat");
debeziumProps.setProperty("database.hostname", props.host());
debeziumProps.setProperty("database.port", String.valueOf(props.port()));
debeziumProps.setProperty("database.user", props.username());
debeziumProps.setProperty("database.password", props.password());
debeziumProps.setProperty("database.dbname", props.dbname());
debeziumProps.setProperty("log.mining.strategy", "online_catalog");
debeziumProps.setProperty("snapshot.mode", "schema_only");
debeziumProps.setProperty("database.history.skip.unparseable.ddl", Boolean.toString(true));
debeziumProps.setProperty("database.history.store.only.captured.tables.ddl", Boolean.toString(true));
debeziumProps.setProperty("database.server.name", "server1");
debeziumProps.setProperty("name", "oracle_cdc_engine");
String schema = props.username().toUpperCase();
debeziumProps.setProperty("schema.include.list", schema);
String tables = new HashSet<>(Arrays.asList(props.cdcTables().toUpperCase().split(",")))
.stream().map(tableName -> schema + "." + tableName)
.collect(Collectors.joining(","));
debeziumProps.setProperty("table.include.list", tables);
// 开启supplemental logging
String url = buildOracleJdbcUrl(props.host(), props.port(), props.dbname());
SupplementalLoggingUtils.enableDBSupplementalLogging(getConn(url, props.username(), props.password()));
SupplementalLoggingUtils.enableSchemaTablesSupplementalLogging(getConn(url, props.username(), props.password()), props.username(),
new HashSet<>(Arrays.asList(props.cdcTables().split(","))));
}
private static Connection getConn(String url, String username, String password) {
try {
return DriverManager.getConnection(url, username, password);
} catch (SQLException e) {
throw new IllegalStateException("获取数据库连接失败", e);
}
}
public static String buildOracleJdbcUrl(String host, int port, String dbname) {
return String.format("jdbc:oracle:thin:@%s:%d:%s", host, port, dbname);
}
}
package com.tbyf.cdcengine2.oracle;
public class OracleCdcEngineProperties {
private String host;
private int port;
private String dbname;
private String username;
private String password;
private String cdcTables;
private String namespace;
public OracleCdcEngineProperties namespace(String namespace) {
this.namespace = namespace;
return this;
}
public String namespace() {
if (namespace == null) {
return "default";
}
return namespace;
}
public OracleCdcEngineProperties host(String host) {
this.host = host;
return this;
}
public OracleCdcEngineProperties port(int port) {
this.port = port;
return this;
}
public OracleCdcEngineProperties dbname(String dbname) {
this.dbname = dbname;
return this;
}
public OracleCdcEngineProperties username(String username) {
this.username = username;
return this;
}
public OracleCdcEngineProperties password(String password) {
this.password = password;
return this;
}
public OracleCdcEngineProperties cdcTables(String cdcTables) {
this.cdcTables = cdcTables;
return this;
}
public String host() {
return host;
}
public int port() {
return port;
}
public String dbname() {
return dbname;
}
public String username() {
return username;
}
public String password() {
return password;
}
public String cdcTables() {
return cdcTables;
}
}
package com.tbyf.cdcengine2.oracle;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;
/**
* 开启数据库或表的supplemental logging的工具类
*/
public class SupplementalLoggingUtils {
/**
* 查询数据库supplemental logging开启状态SQL
*/
private static final String QUERY_DB_SUPPLEMENTAL_LOGGING_SQL = "SELECT SUPPLEMENTAL_LOG_DATA_MIN FROM V$DATABASE";
/**
* 开启数据库supplemental logging SQL
*/
private static final String ENABLE_DB_SUPPLEMENTAL_LOGGING_SQL = "ALTER DATABASE ADD SUPPLEMENTAL LOG DATA";
/**
* 查询所有已开启了supplemental logging的表的SQL
*/
private static final String QUERY_TABLE_SUPPLEMENTAL_LOGGING_SQL = "SELECT OWNER, TABLE_NAME FROM ALL_LOG_GROUPS WHERE LOG_GROUP_TYPE = 'ALL COLUMN LOGGING'";
/**
* 开启表的supplemental logging SQL
*/
private static final String ENABLE_TABLE_SUPPLEMENTAL_LOGGING_SQL = "ALTER TABLE %s.%s ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS";
/**
* 开启Oracle数据库supplemental logging
*
* @param conn
*/
public static void enableDBSupplementalLogging(Connection conn) {
try {
if (!isDBSupplementalLoggingEnabled(conn)) {
try (Statement stmt = conn.createStatement()) {
stmt.executeUpdate(ENABLE_DB_SUPPLEMENTAL_LOGGING_SQL);
}
}
} catch (Exception e) {
throw new IllegalStateException("开启数据库supplemental logging时发生了异常", e);
}
}
private static boolean isDBSupplementalLoggingEnabled(Connection conn) throws SQLException {
try (Statement stmt = conn.createStatement()) {
try (ResultSet rs = stmt.executeQuery(QUERY_DB_SUPPLEMENTAL_LOGGING_SQL)) {
if (rs.next()) {
String result = rs.getString(1);
return "YES".equals(result) || "IMPLICIT".equals(result);
} else {
throw new SQLException("无法查询数据库supplemental logging开启状态");
}
}
}
}
/**
* 批量开启schema下多个表的supplemental logging
*/
public static void enableSchemaTablesSupplementalLogging(Connection conn, String schema, Set<String> tables) {
try {
schema = schema.toUpperCase();
Set<String> needEnableTables = tables.stream().map(String::toUpperCase).collect(Collectors.toSet());
Set<String> enabledTables = getSchemaSupplementalLoggingEnabledTables(conn, schema);
needEnableTables.removeAll(enabledTables);
if (!needEnableTables.isEmpty()) {
try (Statement stmt = conn.createStatement()) {
for (String table : needEnableTables) {
stmt.executeUpdate(String.format(ENABLE_TABLE_SUPPLEMENTAL_LOGGING_SQL, schema, table));
}
}
}
} catch (SQLException e) {
throw new IllegalStateException("开启表supplemental logging时发生了异常", e);
}
}
private static Set<String> getSchemaSupplementalLoggingEnabledTables(Connection conn, String schema) throws SQLException {
try (Statement stmt = conn.createStatement()) {
try (ResultSet rs = stmt.executeQuery(QUERY_TABLE_SUPPLEMENTAL_LOGGING_SQL)) {
Set<String> tables = new HashSet<>();
while (rs.next()) {
if (schema.equals(rs.getString(1))) {
tables.add(rs.getString(2));
}
}
return tables;
}
}
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.tbyf.cdcengine2</groupId>
<artifactId>cdc-engine2</artifactId>
<version>1.0.0</version>
<packaging>pom</packaging>
<modules>
<module>oracle-cdc</module>
<module>core</module>
<module>mysql-cdc</module>
<module>sqlserver-cdc</module>
</modules>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<debezium.version>1.9.0.Final</debezium.version>
<engine.version>1.0.0</engine.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>2.7.14</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-api</artifactId>
<version>${debezium.version}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-embedded</artifactId>
<version>${debezium.version}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-oracle</artifactId>
<version>${debezium.version}</version>
</dependency>
<dependency>
<groupId>com.tbyf.cdcengine2</groupId>
<artifactId>core</artifactId>
<version>${engine.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.tbyf.cdcengine2</groupId>
<artifactId>cdc-engine2</artifactId>
<version>1.0.0</version>
</parent>
<artifactId>sqlserver-cdc</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
</project>
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment