Commit a2d93f1c by y1sa

初始化

parents
target/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**/target/
!**/src/test/**/target/
### IntelliJ IDEA ###
.idea/modules.xml
.idea/jarRepositories.xml
.idea/compiler.xml
.idea/libraries/
*.iws
*.iml
*.ipr
### Eclipse ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
build/
!**/src/main/**/build/
!**/src/test/**/build/
### VS Code ###
.vscode/
### Mac OS ###
.DS_Store
.idea
\ No newline at end of file
source.host=172.16.51.95
source.port=1521
source.dbname=ORCL
source.user=datacenter
source.password=data
target.url=jdbc:oracle:thin:@172.16.51.95:1521:ORCL
target.user=datacenter
target.password=data
debezium.log.mining.batch.size.max=1000000
debezium.log.mining.batch.size.default=100000
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.tbyf</groupId>
<artifactId>oracle-cdc</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<main-class>com.tbyf.oraclecdc.Main</main-class>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<debezium.version>1.9.0.Final</debezium.version>
<oracle.version>19.3.0.0</oracle.version>
<commons-lang3.version>3.14.0</commons-lang3.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-api</artifactId>
<version>${debezium.version}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-embedded</artifactId>
<version>${debezium.version}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-oracle</artifactId>
<version>${debezium.version}</version>
</dependency>
<dependency>
<groupId>com.oracle.ojdbc</groupId>
<artifactId>ojdbc8</artifactId>
<version>${oracle.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>${commons-lang3.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-api</artifactId>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-embedded</artifactId>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-oracle</artifactId>
</dependency>
<dependency>
<groupId>com.oracle.ojdbc</groupId>
<artifactId>ojdbc8</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.12</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.2.22</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.5.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>${main-class}</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
package com.tbyf.oraclecdc;
import com.alibaba.druid.pool.DruidDataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.math.BigDecimal;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.util.List;
import java.util.Map;
/**
* 号源剩余量同步
*/
public class AppointmentQuotaSynchronizer implements RecordChangeHandler {
private static final Logger log = LoggerFactory.getLogger(AppointmentQuotaSynchronizer.class);
private final DruidDataSource dataSource;
private static final String sql = "UPDATE T_SHIFT_SCHEDULE SET SURPLUS_TOTAL = ? WHERE Shift_Code = ?";
public AppointmentQuotaSynchronizer() {
this.dataSource = new DruidDataSource();
this.dataSource.setUrl(Config.TARGET_URL);
this.dataSource.setUsername(Config.TARGET_USER);
this.dataSource.setPassword(Config.TARGET_PASSWORD);
}
@Override
public void handleChanges(List<RecordChange> changes) throws Exception {
for (RecordChange change : changes) {
if (change.getOpType() == RecordChange.OpType.UPDATE) {
try {
Map<String, Object> after = change.getAfter();
BigDecimal ZXE = (BigDecimal) after.get("ZXE"); // 总号源量
BigDecimal GHRS = (BigDecimal) after.get("GHRS"); // 号源使用量
Map<String, Object> before = change.getBefore();
Object beforeZXE = before.get("ZXE");
Object beforeGHRS = before.get("GHRS");
if (beforeZXE.equals(ZXE) && beforeGHRS.equals(GHRS)) {
continue;
}
int remaining = ZXE.intValue() - GHRS.intValue(); // 号源剩余量
String id = (String) after.get("ID");
if (id != null && !id.trim().isEmpty()) {
try (Connection conn = dataSource.getConnection()) {
try (PreparedStatement stmt = conn.prepareStatement(sql)) {
stmt.setObject(1, remaining);
stmt.setString(2, id);
log.info("执行sql: {}", sql);
log.info("参数: SURPLUS_TOTAL[{}], Shift_Code[{}]", remaining, id);
stmt.executeUpdate();
}
}
}
} catch (Exception e) {
log.error("处理记录变更发生错误, 变更记录:{}", change, e);
}
}
}
}
@Override
public void close() {
dataSource.close();
}
}
package com.tbyf.oraclecdc;
import io.debezium.connector.oracle.OracleConnector;
import io.debezium.relational.history.FileDatabaseHistory;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
public class CdcProperties {
private String host;
private int port;
private String dbname;
private String user;
private String password;
private String offsetFileLocation;
private String historyFileLocation;
private String tables;
private boolean autoEnableSupplementLogging = true;
private int channelCapacity = 20000;
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public String getDbname() {
return dbname;
}
public void setDbname(String dbname) {
this.dbname = dbname;
}
public String getUser() {
return user;
}
public void setUser(String user) {
this.user = user;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public int getChannelCapacity() {
return channelCapacity;
}
public void setChannelCapacity(int channelCapacity) {
this.channelCapacity = channelCapacity;
}
public String getOffsetFileLocation() {
String offsetFileLocation = this.offsetFileLocation;
if (offsetFileLocation == null) {
offsetFileLocation = "cdc/offset/" + host + "_" + port + "/" + user + ".off";
}
return offsetFileLocation;
}
public void setOffsetFileLocation(String offsetFileLocation) {
this.offsetFileLocation = offsetFileLocation;
}
public String getHistoryFileLocation() {
String historyFileLocation = this.historyFileLocation;
if (historyFileLocation == null) {
historyFileLocation = "cdc/history/" + host + "_" + port + "/" + user + ".his";
}
return historyFileLocation;
}
public void setHistoryFileLocation(String historyFileLocation) {
this.historyFileLocation = historyFileLocation;
}
public String getTables() {
return tables;
}
public void setTables(String tables) {
this.tables = tables;
}
public boolean isAutoEnableSupplementLogging() {
return autoEnableSupplementLogging;
}
public void setAutoEnableSupplementLogging(boolean autoEnableSupplementLogging) {
this.autoEnableSupplementLogging = autoEnableSupplementLogging;
}
public void validate() {
// todo
}
public Properties asDebeziumProps() {
Properties props = new Properties();
props.putAll(Config.DEBEZIUM_PROPERTIES);
props.setProperty("connector.class", OracleConnector.class.getName());
props.setProperty("database.history", FileDatabaseHistory.class.getName());
props.setProperty("database.history.file.filename", getHistoryFileLocation());
props.setProperty("offset.storage.file.filename", getOffsetFileLocation());
props.setProperty("database.hostname", host);
props.setProperty("database.port", String.valueOf(port));
props.setProperty("database.user", user);
props.setProperty("database.password", password);
props.setProperty("database.dbname", dbname);
props.setProperty("log.mining.strategy", "online_catalog");
props.setProperty("snapshot.mode", "schema_only");
props.setProperty("database.history.skip.unparseable.ddl", Boolean.toString(true));
props.setProperty("database.history.store.only.captured.tables.ddl", Boolean.toString(true));
props.setProperty("database.server.name", "server1");
props.setProperty("name", "oracle_cdc_engine");
props.setProperty("schema.include.list", user);
Set<String> schemaTables = new HashSet<>();
for (String table : tables.split(",")) {
schemaTables.add(user + "." + table);
}
props.setProperty("table.include.list", String.join(",", schemaTables));
return props;
}
}
package com.tbyf.oraclecdc;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class Channel {
private final BlockingQueue<RecordChange> queue;
public Channel(int capacity) {
this.queue = new ArrayBlockingQueue<>(capacity);
}
public void put(RecordChange recordChange) {
try {
queue.put(recordChange);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public List<RecordChange> takeAll() {
List<RecordChange> result = new ArrayList<>();
// todo 暂定最多一次拿2048条变更记录
queue.drainTo(result, 2048);
return result;
}
public RecordChange take() {
try {
return queue.take();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
}
}
}
package com.tbyf.oraclecdc;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Map;
import java.util.Properties;
public class Config {
public static String SOURCE_HOST = "172.16.51.95";
public static int SOURCE_PORT = 1521;
public static String SOURCE_DB_NAME = "ORCL";
public static String SOURCE_USER = "datacenter";
public static String SOURCE_PASSWORD = "data";
public static String TARGET_URL = "jdbc:oracle:thin:@172.16.51.95:1521:ORCL";
public static String TARGET_USER = "datacenter";
public static String TARGET_PASSWORD = "data";
public static Properties DEBEZIUM_PROPERTIES = new Properties();
private static final String DEBEZIUM_PROPERTY_PREFIX = "debezium.";
public static void init(String configLocation) {
try (FileInputStream is = new FileInputStream(configLocation)) {
Properties props = new Properties();
props.load(is);
SOURCE_HOST = props.getProperty("source.host");
SOURCE_PORT = Integer.parseInt(props.getProperty("source.port"));
SOURCE_DB_NAME = props.getProperty("source.dbname");
SOURCE_USER = props.getProperty("source.user");
SOURCE_PASSWORD = props.getProperty("source.password");
TARGET_URL = props.getProperty("target.url");
TARGET_USER = props.getProperty("target.user");
TARGET_PASSWORD = props.getProperty("target.password");
for (Map.Entry<Object, Object> entry : props.entrySet()) {
String key = (String) entry.getKey();
if (key.startsWith(DEBEZIUM_PROPERTY_PREFIX)) {
DEBEZIUM_PROPERTIES.put(key.substring(DEBEZIUM_PROPERTY_PREFIX.length()), entry.getValue());
}
}
} catch (IOException e) {
throw new IllegalStateException("读取配置文件失败", e);
}
}
}
package com.tbyf.oraclecdc;
public class Main {
public static void main(String[] args) {
if (args.length < 1) {
System.out.println("请指定配置文件");
return;
}
Config.init(args[0]);
CdcProperties cdcProperties = new CdcProperties();
cdcProperties.setHost(Config.SOURCE_HOST);
cdcProperties.setPort(Config.SOURCE_PORT);
cdcProperties.setDbname(Config.SOURCE_DB_NAME);
cdcProperties.setUser(Config.SOURCE_USER);
cdcProperties.setPassword(Config.SOURCE_PASSWORD);
cdcProperties.setTables("MZGH_NYSPBMX");
OracleCdcEngine engine = new OracleCdcEngine(cdcProperties);
RecordChangeHandler recordChangeHandler = new AppointmentQuotaSynchronizer();
engine.setRecordChangeHandler(recordChangeHandler);
engine.init();
engine.start();
Runtime.getRuntime().addShutdownHook(
new Thread(() -> {
engine.stop();
recordChangeHandler.close();
}, "oracle_cdc_engine_shutdown_hook"));
}
}
package com.tbyf.oraclecdc;
import com.tbyf.oraclecdc.util.CdcHelper;
import com.tbyf.oraclecdc.util.DBUtils;
import com.tbyf.oraclecdc.util.DateUtils;
import io.debezium.embedded.Connect;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.spi.OffsetCommitPolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class OracleCdcEngine {
private static final Logger log = LoggerFactory.getLogger(OracleCdcEngine.class);
private volatile DebeziumEngine<?> debeziumEngine;
private volatile ExecutorService executor;
private volatile Properties props;
private final CdcProperties cdcProperties;
private volatile RecordChangeHandler recordChangeHandler;
private volatile Thread changeConsumerThread;
static {
try {
Class.forName("oracle.jdbc.driver.OracleDriver");
} catch (ClassNotFoundException e) {
throw new IllegalStateException("未找到Oracle jdbc驱动", e);
}
}
public OracleCdcEngine(CdcProperties cdcProperties) {
this.cdcProperties = cdcProperties;
}
public void init() {
cdcProperties.validate();
if (cdcProperties.isAutoEnableSupplementLogging()) {
String url = DBUtils.buildOracleJdbcUrl(cdcProperties.getHost(), cdcProperties.getPort(), cdcProperties.getDbname());
String user = cdcProperties.getUser();
String password = cdcProperties.getPassword();
try (Connection conn = DriverManager.getConnection(url, user, password)) {
CdcHelper.enableDBSupplementalLogging(conn);
CdcHelper.enableSchemaTablesSupplementalLogging(conn, cdcProperties.getUser(),
new HashSet<>(Arrays.asList(cdcProperties.getTables().split(","))));
} catch (SQLException ex) {
throw new IllegalStateException("获取数据库连接失败", ex);
}
}
mkParentDirs(cdcProperties.getOffsetFileLocation());
mkParentDirs(cdcProperties.getHistoryFileLocation());
props = cdcProperties.asDebeziumProps();
}
private static void mkParentDirs(String dirLocation) {
new File(dirLocation).getParentFile().mkdirs();
}
public void start() {
Properties props = this.props;
if (props == null) {
throw new IllegalStateException("Oracle CDC引擎未初始化");
}
final RecordChangeHandler recordChangeHandler = this.recordChangeHandler;
if (recordChangeHandler == null) {
throw new IllegalStateException("未设置RecordChangeHandler");
}
log.info("Oracle CDC引擎启动时间: {}, 引擎配置:{}", DateUtils.getNowAsString(), props);
final RecordChangeAdapter changeAdapter = new RecordChangeAdapter();
final Channel channel = new Channel(cdcProperties.getChannelCapacity());
debeziumEngine = DebeziumEngine.create(Connect.class)
.using(OffsetCommitPolicy.always())
.using(this.props)
.using((success, message, error) -> {
if (!success) {
log.error("Oracle CDC引擎启动失败: {}", message, error);
stop();
}
})
.notifying(changeEvent -> {
try {
RecordChange recordChange = changeAdapter.adapt(changeEvent);
if (recordChange != null) {
channel.put(recordChange);
}
} catch (Exception e) {
log.error("处理记录变更时发生了错误", e);
}
}).build();
executor = Executors.newSingleThreadExecutor();
executor.submit(debeziumEngine);
changeConsumerThread = new Thread(() -> {
while (true) {
List<RecordChange> recordChanges = channel.takeAll();
if (recordChanges.isEmpty()) {
RecordChange take = channel.take();
if (take == null) {
break;
}
recordChanges = Collections.singletonList(take);
}
try {
recordChangeHandler.handleChanges(recordChanges);
} catch (Exception e) {
log.error("处理记录变更时发生了错误", e);
}
}
}, "oracle_cdc_consumer");
changeConsumerThread.start();
}
public void setRecordChangeHandler(RecordChangeHandler recordChangeHandler) {
this.recordChangeHandler = recordChangeHandler;
}
public void stop() {
DebeziumEngine<?> debeziumEngine = this.debeziumEngine;
if (debeziumEngine != null) {
try {
debeziumEngine.close();
} catch (IOException e) {
log.error(e.getMessage(), e);
}
}
// help gc
this.debeziumEngine = null;
ExecutorService executor = this.executor;
if (executor != null) {
executor.shutdown();
}
this.executor = null;
Thread changeConsumerThread = this.changeConsumerThread;
if (changeConsumerThread != null && changeConsumerThread.isAlive()) {
changeConsumerThread.interrupt();
}
this.changeConsumerThread = null;
}
}
package com.tbyf.oraclecdc;
import java.util.Map;
public class RecordChange {
private String schema;
private String table;
private Map<String, Object> before;
private Map<String, Object> after;
private OpType opType;
public String getSchema() {
return schema;
}
public void setSchema(String schema) {
this.schema = schema;
}
public String getTable() {
return table;
}
public void setTable(String table) {
this.table = table;
}
public Map<String, Object> getBefore() {
return before;
}
public void setBefore(Map<String, Object> before) {
this.before = before;
}
public Map<String, Object> getAfter() {
return after;
}
public void setAfter(Map<String, Object> after) {
this.after = after;
}
public OpType getOpType() {
return opType;
}
public void setOpType(OpType opType) {
this.opType = opType;
}
public enum OpType {
CREATE,
DELETE,
UPDATE
}
@Override
public String toString() {
return "RecordChange{" +
"schema='" + schema + '\'' +
", table='" + table + '\'' +
", before=" + before +
", after=" + after +
", opType=" + opType +
'}';
}
}
package com.tbyf.oraclecdc;
import io.debezium.data.Envelope;
import io.debezium.data.VariableScaleDecimal;
import io.debezium.engine.ChangeEvent;
import io.debezium.time.Timestamp;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.HashMap;
import java.util.Map;
public class RecordChangeAdapter {
private static final Logger log = LoggerFactory.getLogger(RecordChangeAdapter.class);
/**
* 如果返回null则表示忽略这个变更事件
*
* @param changeEvent
* @return
*/
public RecordChange adapt(ChangeEvent<SourceRecord, SourceRecord> changeEvent) {
try {
SourceRecord sr = changeEvent.value();
Struct value = (Struct) sr.value();
if (value == null) {
return null;
}
Schema valueSchema = sr.valueSchema();
if (!valueSchema.name().endsWith(".Envelope")) {
return null;
}
RecordChange recordChange = new RecordChange();
String destination = changeEvent.destination();
String schemaTable = destination.substring(destination.indexOf(".") + 1);
String[] split = schemaTable.split("\\.");
recordChange.setSchema(split[0]);
recordChange.setTable(split[1]);
Struct before = value.getStruct(valueSchema.field(Envelope.FieldName.BEFORE).name());
Struct after = value.getStruct(valueSchema.field(Envelope.FieldName.AFTER).name());
String op = value.getString(valueSchema.field(Envelope.FieldName.OPERATION).name());
Envelope.Operation operation = Envelope.Operation.forCode(op);
if (operation == Envelope.Operation.CREATE) {
recordChange.setOpType(RecordChange.OpType.CREATE);
} else if (operation == Envelope.Operation.UPDATE) {
recordChange.setOpType(RecordChange.OpType.UPDATE);
} else if (operation == Envelope.Operation.DELETE) {
recordChange.setOpType(RecordChange.OpType.DELETE);
} else {
return null;
}
recordChange.setBefore(resolveColumns(before));
recordChange.setAfter(resolveColumns(after));
return recordChange;
} catch (Exception e) {
log.error(e.getMessage(), e);
return null;
}
}
private Map<String, Object> resolveColumns(Struct cols) {
if (cols == null) {
return null;
}
HashMap<String, Object> result = new HashMap<>();
Schema schema = cols.schema();
for (Field field : schema.fields()) {
Object value = cols.get(field);
if (value != null) {
if (field.schema().type() == Schema.Type.STRUCT) {
Struct structValue = (Struct) value;
if (field.schema().name().equals(VariableScaleDecimal.class.getName())) {
value = new BigDecimal(new BigInteger(structValue.getBytes(VariableScaleDecimal.VALUE_FIELD)),
structValue.getInt32(VariableScaleDecimal.SCALE_FIELD));
}
}
if (field.schema().type() == Schema.Type.INT64) {
Long longValue = (Long) value;
if (field.schema().name().equals(Timestamp.class.getName())) {
value = LocalDateTime.ofInstant(Instant.ofEpochMilli(longValue), ZoneOffset.UTC);
}
}
// todo 考虑其他情况
}
result.put(field.name(), value);
}
return result;
}
}
package com.tbyf.oraclecdc;
import java.util.List;
public interface RecordChangeHandler {
void handleChanges(List<RecordChange> changes) throws Exception;
default void close() {
}
}
package com.tbyf.oraclecdc.util;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashSet;
import java.util.Set;
/**
* 开启Oracle数据库和表的supplemental logging
*/
public class CdcHelper {
private static final String QUERY_DB_SUPPLEMENTAL_LOGGING_SQL = "SELECT SUPPLEMENTAL_LOG_DATA_MIN FROM V$DATABASE";
private static final String ENABLE_DB_SUPPLEMENTAL_LOGGING_SQL = "ALTER DATABASE ADD SUPPLEMENTAL LOG DATA";
private static final String QUERY_TABLE_SUPPLEMENTAL_LOGGING_SQL = "SELECT OWNER, TABLE_NAME FROM ALL_LOG_GROUPS WHERE LOG_GROUP_TYPE = 'ALL COLUMN LOGGING'";
private static final String ENABLE_TABLE_SUPPLEMENTAL_LOGGING_SQL = "ALTER TABLE %s.%s ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS";
/**
* 开启Oracle数据库supplemental logging
*
* @param conn
* @throws SQLException
*/
public static void enableDBSupplementalLogging(Connection conn) throws SQLException {
if (!isDBSupplementalLoggingEnabled(conn)) {
try (Statement stmt = conn.createStatement()) {
stmt.executeUpdate(ENABLE_DB_SUPPLEMENTAL_LOGGING_SQL);
}
}
}
public static boolean isDBSupplementalLoggingEnabled(Connection conn) throws SQLException {
try (Statement stmt = conn.createStatement()) {
try (ResultSet rs = stmt.executeQuery(QUERY_DB_SUPPLEMENTAL_LOGGING_SQL)) {
if (rs.next()) {
String result = rs.getString(1);
return "YES".equals(result) || "IMPLICIT".equals(result);
} else {
throw new SQLException("无法查询数据库的supplemental logging");
}
}
}
}
/**
* 开启Oracle schema下指定表的supplemental logging
* @param conn
* @param schema
* @param tables
* @throws SQLException
*/
public static void enableSchemaTablesSupplementalLogging(Connection conn, String schema, Set<String> tables) throws SQLException {
Set<String> needEnableTables = new HashSet<>(tables);
Set<String> enabledTables = getSchemaSupplementalLoggingEnabledTables(conn, schema);
needEnableTables.removeAll(enabledTables);
if (!needEnableTables.isEmpty()) {
try (Statement stmt = conn.createStatement()) {
for (String table : needEnableTables) {
stmt.executeUpdate(String.format(ENABLE_TABLE_SUPPLEMENTAL_LOGGING_SQL, schema, table));
}
}
}
}
private static Set<String> getSchemaSupplementalLoggingEnabledTables(Connection conn, String schema) throws SQLException {
try (Statement stmt = conn.createStatement()) {
try (ResultSet rs = stmt.executeQuery(QUERY_TABLE_SUPPLEMENTAL_LOGGING_SQL)) {
Set<String> tables = new HashSet<>();
while (rs.next()) {
if (schema.equalsIgnoreCase(rs.getString(1))) {
tables.add(rs.getString(2));
}
}
return tables;
}
}
}
}
package com.tbyf.oraclecdc.util;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
public class DBUtils {
public static String buildOracleJdbcUrl(String host, int port, String dbname) {
return String.format("jdbc:oracle:thin:@%s:%d:%s", host, port, dbname);
}
public static Connection getConn(String url, String user, String password) throws SQLException {
return DriverManager.getConnection(url, user, password);
}
}
package com.tbyf.oraclecdc.util;
import java.text.SimpleDateFormat;
import java.util.Date;
public class DateUtils {
public static final String DATE_FORMAT_PATTERN = "yyyy-MM-dd HH:mm:ss";
public static String getNowAsString() {
Date now = new Date();
SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT_PATTERN);
return dateFormat.format(now);
}
}
<configuration>
<!-- 定义日志文件的输出模式 -->
<property name="LOG_PATH" value="logs" />
<property name="LOG_FILE" value="application.log" />
<!-- 控制台输出 -->
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<!-- 文件输出 -->
<appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${LOG_PATH}/${LOG_FILE}</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<!-- 每天生成一个新的日志文件 -->
<fileNamePattern>${LOG_PATH}/application.%d{yyyy-MM-dd}.log</fileNamePattern>
<!-- 保留最近 30 天的日志文件 -->
<maxHistory>30</maxHistory>
</rollingPolicy>
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<!-- 配置日志输出级别,ROOT logger 同时输出到控制台和文件 -->
<root level="INFO">
<appender-ref ref="CONSOLE" />
<appender-ref ref="FILE" />
</root>
</configuration>
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment