Commit 2f93df9e by 成浩

针对oracle一个库很多用户适配环境做了适当改动

parent b5077bd8
......@@ -23,10 +23,13 @@ public class OracleCdcEngine extends AbstractCdcEngine<OracleCdcEngine> {
debeziumProps.setProperty("log.mining.strategy", "online_catalog");
String schema = props.getUsername().toUpperCase();
if (props.getSchemaList()!=null){
schema = props.getSchemaList();
}
debeziumProps.setProperty(Constants.SCHEMA_INCLUDE_LIST_PROP, schema);
String capturedTables = new HashSet<>(Arrays.asList(props.getCapturedTables().toUpperCase().split(",")))
.stream().map(tableName -> schema + "." + tableName)
.stream()//.map(tableName -> schema + "." + tableName)
.collect(Collectors.joining(","));
debeziumProps.setProperty(Constants.TABLE_INCLUDE_LIST_PROP, capturedTables);
......
......@@ -6,6 +6,15 @@ public class OracleCdcEngineProperties extends BaseCdcEngineProperties {
private String dbname;
private String capturedTables;
private String schemaList;
public String getSchemaList() {
return schemaList;
}
public void setSchemaList(String schemaList) {
this.schemaList = schemaList;
}
public String getDbname() {
return dbname;
......
......@@ -4,6 +4,7 @@ import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;
......@@ -26,12 +27,12 @@ public class SupplementalLoggingUtils {
/**
* 查询所有已开启了supplemental logging的表的SQL
*/
private static final String QUERY_TABLE_SUPPLEMENTAL_LOGGING_SQL = "SELECT OWNER, TABLE_NAME FROM ALL_LOG_GROUPS WHERE LOG_GROUP_TYPE = 'ALL COLUMN LOGGING'";
private static final String QUERY_TABLE_SUPPLEMENTAL_LOGGING_SQL = "SELECT OWNER, OWNER||'.'||TABLE_NAME AS TABLE_NAME FROM ALL_LOG_GROUPS WHERE LOG_GROUP_TYPE = 'ALL COLUMN LOGGING'";
/**
* 开启表的supplemental logging SQL
*/
private static final String ENABLE_TABLE_SUPPLEMENTAL_LOGGING_SQL = "ALTER TABLE %s.%s ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS";
private static final String ENABLE_TABLE_SUPPLEMENTAL_LOGGING_SQL = "ALTER TABLE %s ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS";
/**
* 开启Oracle数据库supplemental logging
......@@ -75,7 +76,7 @@ public class SupplementalLoggingUtils {
if (!needEnableTables.isEmpty()) {
try (Statement stmt = conn.createStatement()) {
for (String table : needEnableTables) {
stmt.executeUpdate(String.format(ENABLE_TABLE_SUPPLEMENTAL_LOGGING_SQL, schema, table));
stmt.executeUpdate(String.format(ENABLE_TABLE_SUPPLEMENTAL_LOGGING_SQL, table));
}
}
}
......@@ -89,7 +90,7 @@ public class SupplementalLoggingUtils {
try (ResultSet rs = stmt.executeQuery(QUERY_TABLE_SUPPLEMENTAL_LOGGING_SQL)) {
Set<String> tables = new HashSet<>();
while (rs.next()) {
if (schema.equals(rs.getString(1))) {
if (Arrays.asList(schema.split(",")).contains(rs.getString(1))) {
tables.add(rs.getString(2));
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment