Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
D
data-sync
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
黄营
data-sync
Commits
d060d162
Commit
d060d162
authored
Nov 05, 2024
by
y1sa
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
改造为oracle同步
parent
d5f5ba3a
Expand all
Hide whitespace changes
Inline
Side-by-side
Showing
11 changed files
with
139 additions
and
124 deletions
+139
-124
.gitignore
.gitignore
+5
-2
datasync.properties
datasync.properties
+12
-12
dbhistory.dat
dbhistory.dat
+0
-0
offset.dat
offset.dat
+0
-0
pom.xml
pom.xml
+14
-1
Main.java
src/main/java/com/tbyd/data/datasync/Main.java
+2
-0
CdcHelper.java
src/main/java/com/tbyd/data/datasync/core/CdcHelper.java
+53
-68
DBUtils.java
src/main/java/com/tbyd/data/datasync/core/DBUtils.java
+7
-8
DataSyncInitializer.java
...java/com/tbyd/data/datasync/core/DataSyncInitializer.java
+18
-10
DataSyncWriter.java
...main/java/com/tbyd/data/datasync/core/DataSyncWriter.java
+23
-22
debezium.properties
src/main/resources/debezium.properties
+5
-1
No files found.
.gitignore
View file @
d060d162
...
...
@@ -39,4 +39,7 @@ build/
.idea/
logs/
\ No newline at end of file
logs/
dbhistory.dat
offset.dat
\ No newline at end of file
datasync.properties
View file @
d060d162
source.host
=
192.168.0.85
source.port
=
1433
source.user
=
sa
source.password
=
1
source.dbname
=
dz_his
sync.tables
=
dbo.BA_BRDA(ZYH)
dest.host
=
192.168.0.85
dest.port
=
1433
dest.user
=
sa
dest.password
=
1
dest.dbname
=
dz_emr
\ No newline at end of file
source.host
=
192.168.0.152
source.port
=
1521
source.user
=
datacenter
source.password
=
data
source.dbname
=
ORCL
sync.tables
=
DATACENTER.TEST_SYNC(ID)
dest.host
=
10.99.44.13
dest.port
=
11521
dest.user
=
datacenter
dest.password
=
TBYF_soft$data
dest.dbname
=
helowin
\ No newline at end of file
dbhistory.dat
deleted
100644 → 0
View file @
d5f5ba3a
This diff is collapsed.
Click to expand it.
offset.dat
deleted
100644 → 0
View file @
d5f5ba3a
File deleted
pom.xml
View file @
d060d162
...
...
@@ -20,6 +20,7 @@
<logback.version>
1.2.12
</logback.version>
<lombok.version>
1.18.30
</lombok.version>
<druid.version>
1.2.22
</druid.version>
<oracle.version>
19.3.0.0
</oracle.version>
</properties>
<dependencies>
...
...
@@ -33,7 +34,7 @@
<artifactId>
debezium-embedded
</artifactId>
<version>
${debezium.version}
</version>
</dependency>
<dependency>
<
!--<
dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-sqlserver</artifactId>
<version>${debezium.version}</version>
...
...
@@ -48,7 +49,19 @@
<groupId>com.microsoft.sqlserver</groupId>
<artifactId>mssql-jdbc</artifactId>
<version>${mssql-jdbc.version}</version>
</dependency>-->
<dependency>
<groupId>
io.debezium
</groupId>
<artifactId>
debezium-connector-oracle
</artifactId>
<version>
${debezium.version}
</version>
</dependency>
<dependency>
<groupId>
com.oracle.ojdbc
</groupId>
<artifactId>
ojdbc8
</artifactId>
<version>
${oracle.version}
</version>
</dependency>
<dependency>
<groupId>
org.apache.commons
</groupId>
<artifactId>
commons-lang3
</artifactId>
...
...
src/main/java/com/tbyd/data/datasync/Main.java
View file @
d060d162
...
...
@@ -23,6 +23,8 @@ public class Main {
return
;
}
new
AppCleaner
().
cleanBeforeStart
();
Properties
props
=
DebeziumProperties
.
get
();
...
...
src/main/java/com/tbyd/data/datasync/core/CdcHelper.java
View file @
d060d162
package
com
.
tbyd
.
data
.
datasync
.
core
;
import
lombok.extern.slf4j.Slf4j
;
import
java.sql.Connection
;
import
java.sql.ResultSet
;
import
java.sql.SQLException
;
import
java.sql.Statement
;
import
java.util.HashSet
;
import
java.util.Set
;
@Slf4j
/**
* 开启Oracle数据库和表的supplemental logging
*/
public
class
CdcHelper
{
private
static
final
String
DB_CDC_ENABLED_QUERY_SQL
=
"SELECT is_cdc_enabled \n"
+
"FROM sys.databases \n"
+
"WHERE name = '${dbName}'"
;
private
static
final
String
ENABLE_DB_CDC_SQL
=
"EXEC sys.sp_cdc_enable_db"
;
private
static
final
String
QUERY_DB_SUPPLEMENTAL_LOGGING_SQL
=
"SELECT SUPPLEMENTAL_LOG_DATA_MIN FROM V$DATABASE"
;
private
static
final
String
TABLE_CDC_ENABLED_QUERY_SQL
=
"SELECT is_tracked_by_cdc \n"
+
"FROM sys.tables \n"
+
"WHERE name = '${tableName}' AND schema_id = SCHEMA_ID('dbo')"
;
private
static
final
String
ENABLE_DB_SUPPLEMENTAL_LOGGING_SQL
=
"ALTER DATABASE ADD SUPPLEMENTAL LOG DATA"
;
private
static
final
String
ENABLE_TABLE_CDC_SQL
=
"EXEC sys.sp_cdc_enable_table \n"
+
"@source_schema = N'dbo', \n"
+
"@source_name = N'${tableName}', \n"
+
"@role_name = N'NULL', \n"
+
// TODO role_name
"@supports_net_changes = 0"
;
private
static
final
String
QUERY_TABLE_SUPPLEMENTAL_LOGGING_SQL
=
"SELECT OWNER, TABLE_NAME FROM ALL_LOG_GROUPS WHERE LOG_GROUP_TYPE = 'ALL COLUMN LOGGING'"
;
private
static
final
String
ENABLE_TABLE_SUPPLEMENTAL_LOGGING_SQL
=
"ALTER TABLE %s.%s ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS"
;
// 查询数据库是否开启了CDC
public
static
boolean
isDBCdcEnabled
(
Connection
conn
,
String
dbName
)
{
try
(
Statement
stmt
=
conn
.
createStatement
())
{
DBUtils
.
switchDB
(
stmt
,
dbName
);
String
sql
=
DB_CDC_ENABLED_QUERY_SQL
.
replace
(
"${dbName}"
,
dbName
);
try
(
ResultSet
rs
=
stmt
.
executeQuery
(
sql
))
{
if
(
rs
.
next
())
{
return
rs
.
getInt
(
1
)
==
1
;
}
/**
* 开启Oracle数据库supplemental logging
*
* @param conn
* @throws SQLException
*/
public
static
void
enableDBSupplementalLogging
(
Connection
conn
)
throws
SQLException
{
if
(!
isDBSupplementalLoggingEnabled
(
conn
))
{
try
(
Statement
stmt
=
conn
.
createStatement
())
{
stmt
.
executeUpdate
(
ENABLE_DB_SUPPLEMENTAL_LOGGING_SQL
);
}
}
catch
(
SQLException
e
)
{
log
.
error
(
e
.
getMessage
(),
e
);
}
throw
new
IllegalStateException
(
"查询数据库["
+
dbName
+
"]CDC开启状态失败"
);
}
public
static
void
enableDBCdcIfNeeded
(
Connection
conn
,
String
dbName
)
{
if
(!
isDBCdcEnabled
(
conn
,
dbName
))
{
enableDBCdc
(
conn
,
dbName
);
}
}
public
static
void
enableDBCdc
(
Connection
conn
,
String
dbName
)
{
public
static
boolean
isDBSupplementalLoggingEnabled
(
Connection
conn
)
throws
SQLException
{
try
(
Statement
stmt
=
conn
.
createStatement
())
{
DBUtils
.
switchDB
(
stmt
,
dbName
);
stmt
.
execute
(
ENABLE_DB_CDC_SQL
);
}
catch
(
SQLException
e
)
{
log
.
error
(
e
.
getMessage
(),
e
);
throw
new
IllegalStateException
(
"开启数据库["
+
dbName
+
"]CDC失败"
,
e
);
try
(
ResultSet
rs
=
stmt
.
executeQuery
(
QUERY_DB_SUPPLEMENTAL_LOGGING_SQL
))
{
if
(
rs
.
next
())
{
String
result
=
rs
.
getString
(
1
);
return
"YES"
.
equals
(
result
)
||
"IMPLICIT"
.
equals
(
result
);
}
else
{
throw
new
SQLException
(
"无法查询数据库的supplemental logging"
);
}
}
}
}
// 查询表是否开启了CDC
public
static
boolean
isTableCdcEnabled
(
Connection
conn
,
String
dbName
,
String
tableName
)
{
try
(
Statement
stmt
=
conn
.
createStatement
())
{
DBUtils
.
switchDB
(
stmt
,
dbName
);
String
sql
=
TABLE_CDC_ENABLED_QUERY_SQL
.
replace
(
"${tableName}"
,
tableName
);
try
(
ResultSet
rs
=
stmt
.
executeQuery
(
sql
))
{
if
(
rs
.
next
())
{
return
rs
.
getInt
(
1
)
==
1
;
/**
* 开启Oracle schema下指定表的supplemental logging
* @param conn
* @param schema
* @param tables
* @throws SQLException
*/
public
static
void
enableSchemaTablesSupplementalLogging
(
Connection
conn
,
String
schema
,
Set
<
String
>
tables
)
throws
SQLException
{
Set
<
String
>
needEnableTables
=
new
HashSet
<>(
tables
);
Set
<
String
>
enabledTables
=
getSchemaSupplementalLoggingEnabledTables
(
conn
,
schema
);
needEnableTables
.
removeAll
(
enabledTables
);
if
(!
needEnableTables
.
isEmpty
())
{
try
(
Statement
stmt
=
conn
.
createStatement
())
{
for
(
String
table
:
needEnableTables
)
{
stmt
.
executeUpdate
(
String
.
format
(
ENABLE_TABLE_SUPPLEMENTAL_LOGGING_SQL
,
schema
,
table
));
}
}
}
catch
(
SQLException
e
)
{
log
.
error
(
e
.
getMessage
(),
e
);
}
throw
new
IllegalStateException
(
"查询表["
+
dbName
+
"."
+
tableName
+
"]CDC开启状态失败"
);
}
p
ublic
static
void
enableTableCdc
(
Connection
conn
,
String
dbName
,
String
tableName
)
{
p
rivate
static
Set
<
String
>
getSchemaSupplementalLoggingEnabledTables
(
Connection
conn
,
String
schema
)
throws
SQLException
{
try
(
Statement
stmt
=
conn
.
createStatement
())
{
DBUtils
.
switchDB
(
stmt
,
dbName
);
String
sql
=
ENABLE_TABLE_CDC_SQL
.
replace
(
"${tableName}"
,
tableName
);
stmt
.
execute
(
sql
);
}
catch
(
SQLException
e
)
{
log
.
error
(
e
.
getMessage
(),
e
);
throw
new
IllegalStateException
(
"开启表["
+
dbName
+
"."
+
tableName
+
"]CDC失败"
);
try
(
ResultSet
rs
=
stmt
.
executeQuery
(
QUERY_TABLE_SUPPLEMENTAL_LOGGING_SQL
))
{
Set
<
String
>
tables
=
new
HashSet
<>();
while
(
rs
.
next
())
{
if
(
schema
.
equalsIgnoreCase
(
rs
.
getString
(
1
)))
{
tables
.
add
(
rs
.
getString
(
2
));
}
}
return
tables
;
}
}
}
public
static
void
enableTableCdcIfNeeded
(
Connection
conn
,
String
dbName
,
String
tableName
)
{
if
(!
isTableCdcEnabled
(
conn
,
dbName
,
tableName
))
{
enableTableCdc
(
conn
,
dbName
,
tableName
);
}
}
}
\ No newline at end of file
src/main/java/com/tbyd/data/datasync/core/DBUtils.java
View file @
d060d162
...
...
@@ -14,13 +14,13 @@ import java.util.Map;
@Slf4j
public
class
DBUtils
{
public
static
final
String
SQL_SERVER_DRIVER_CLASS_NAME
=
"com.microsoft.sqlserver.jdbc.SQLServer
Driver"
;
public
static
final
String
ORACLE_DRIVER_CLASS_NAME
=
"oracle.jdbc.driver.Oracle
Driver"
;
static
{
try
{
Class
.
forName
(
SQL_SERVER
_DRIVER_CLASS_NAME
);
Class
.
forName
(
ORACLE
_DRIVER_CLASS_NAME
);
}
catch
(
ClassNotFoundException
e
)
{
throw
new
IllegalStateException
(
"未找到
sql server
驱动"
,
e
);
throw
new
IllegalStateException
(
"未找到
Oracle jdbc
驱动"
,
e
);
}
}
...
...
@@ -54,9 +54,8 @@ public class DBUtils {
}
}
public
static
String
getSqlServerUrl
(
String
host
,
String
port
)
{
return
SQL_SERVER_URL_PATTERN
.
replace
(
"<host>"
,
host
)
.
replace
(
"<port>"
,
port
);
public
static
String
buildOracleJdbcUrl
(
String
host
,
String
port
,
String
dbname
)
{
return
String
.
format
(
"jdbc:oracle:thin:@%s:%s:%s"
,
host
,
port
,
dbname
);
}
public
static
void
switchDB
(
Statement
stmt
,
String
dbName
)
throws
SQLException
{
...
...
@@ -64,8 +63,8 @@ public class DBUtils {
stmt
.
execute
(
sql
);
}
public
static
Connection
getConn
(
String
host
,
String
port
,
String
user
,
String
password
)
throws
SQLException
{
return
DriverManager
.
getConnection
(
getSqlServerUrl
(
host
,
port
),
user
,
password
);
public
static
Connection
getConn
(
String
host
,
String
port
,
String
dbname
,
String
user
,
String
password
)
throws
SQLException
{
return
DriverManager
.
getConnection
(
buildOracleJdbcUrl
(
host
,
port
,
dbname
),
user
,
password
);
}
public
static
void
closeResource
(
Connection
conn
)
{
...
...
src/main/java/com/tbyd/data/datasync/core/DataSyncInitializer.java
View file @
d060d162
...
...
@@ -4,7 +4,10 @@ import com.tbyd.data.datasync.config.DataSyncProperties;
import
lombok.extern.slf4j.Slf4j
;
import
java.sql.Connection
;
import
java.util.Arrays
;
import
java.util.HashSet
;
import
java.util.Properties
;
import
java.util.stream.Collectors
;
@Slf4j
public
class
DataSyncInitializer
{
...
...
@@ -19,22 +22,27 @@ public class DataSyncInitializer {
String
sourceDbname
=
DataSyncProperties
.
getSourceDbname
(
props
);
String
[]
syncTableNames
=
DataSyncProperties
.
getSyncTableNamesArray
(
props
);
try
{
Connection
conn
=
DBUtils
.
getConn
(
sourceHost
,
sourcePort
,
sourceUser
,
sourcePassword
);
CdcHelper
.
enableDB
CdcIfNeeded
(
conn
,
sourceDbname
);
Connection
conn
=
DBUtils
.
getConn
(
sourceHost
,
sourcePort
,
source
Dbname
,
source
User
,
sourcePassword
);
CdcHelper
.
enableDB
SupplementalLogging
(
conn
);
DBUtils
.
closeResource
(
conn
);
for
(
String
tableName
:
syncTableNames
)
{
conn
=
DBUtils
.
getConn
(
sourceHost
,
sourcePort
,
sourceUser
,
sourcePassword
);
CdcHelper
.
enableTableCdcIfNeeded
(
conn
,
sourceDbname
,
stripSchemaId
(
tableName
));
DBUtils
.
closeResource
(
conn
);
}
conn
=
DBUtils
.
getConn
(
sourceHost
,
sourcePort
,
sourceDbname
,
sourceUser
,
sourcePassword
);
CdcHelper
.
enableSchemaTablesSupplementalLogging
(
conn
,
sourceUser
,
new
HashSet
<>(
Arrays
.
asList
(
syncTableNames
))
.
stream
()
.
map
(
DataSyncInitializer:
:
stripSchema
)
.
collect
(
Collectors
.
toSet
()));
DBUtils
.
closeResource
(
conn
);
}
catch
(
Exception
e
)
{
log
.
error
(
e
.
getMessage
(),
e
);
throw
new
IllegalStateException
(
"初始化失败"
,
e
);
}
}
private
static
String
stripSchemaId
(
String
tableName
)
{
String
schemaIdPrefix
=
DBO_SCHEMA_ID
+
"."
;
return
tableName
.
substring
(
schemaIdPrefix
.
length
());
public
static
String
stripSchema
(
String
fullName
)
{
return
fullName
.
substring
(
fullName
.
indexOf
(
"."
)
+
1
);
}
}
src/main/java/com/tbyd/data/datasync/core/DataSyncWriter.java
View file @
d060d162
...
...
@@ -5,6 +5,7 @@ import com.alibaba.druid.pool.DruidPooledConnection;
import
com.tbyd.data.datasync.config.DataSyncProperties
;
import
com.tbyd.data.datasync.config.TableAndKeys
;
import
io.debezium.data.Envelope
;
import
io.debezium.data.VariableScaleDecimal
;
import
io.debezium.engine.ChangeEvent
;
import
io.debezium.engine.DebeziumEngine
;
import
lombok.extern.slf4j.Slf4j
;
...
...
@@ -13,10 +14,15 @@ import org.apache.kafka.connect.data.Schema;
import
org.apache.kafka.connect.data.Struct
;
import
org.apache.kafka.connect.source.SourceRecord
;
import
java.math.BigDecimal
;
import
java.math.BigInteger
;
import
java.sql.Connection
;
import
java.sql.PreparedStatement
;
import
java.sql.SQLException
;
import
java.sql.Timestamp
;
import
java.time.Instant
;
import
java.time.LocalDateTime
;
import
java.time.ZoneOffset
;
import
java.util.HashMap
;
import
java.util.List
;
import
java.util.Map
;
...
...
@@ -36,7 +42,7 @@ public class DataSyncWriter implements RecordHandler {
String
user
=
DataSyncProperties
.
getDestinationUser
(
props
);
String
password
=
DataSyncProperties
.
getDestinationPassword
(
props
);
String
dbname
=
DataSyncProperties
.
getDestinationDbname
(
props
);
String
url
=
buildUrl
(
host
,
port
,
dbname
);
String
url
=
build
OracleJdbc
Url
(
host
,
port
,
dbname
);
dataSource
=
new
DruidDataSource
();
dataSource
.
setUrl
(
url
);
dataSource
.
setUsername
(
user
);
...
...
@@ -58,10 +64,8 @@ public class DataSyncWriter implements RecordHandler {
}
}
private
static
String
buildUrl
(
String
host
,
String
port
,
String
dbName
)
{
return
JDBC_PATTERN
.
replace
(
"<server_name>"
,
host
)
.
replace
(
"<port>"
,
port
)
.
replace
(
"<database_name>"
,
dbName
);
public
static
String
buildOracleJdbcUrl
(
String
host
,
String
port
,
String
dbname
)
{
return
String
.
format
(
"jdbc:oracle:thin:@%s:%s:%s"
,
host
,
port
,
dbname
);
}
@Override
...
...
@@ -72,7 +76,7 @@ public class DataSyncWriter implements RecordHandler {
return
false
;
}
Schema
valueSchema
=
r
.
valueSchema
();
if
(
valueSchema
.
name
().
equals
(
"io.debezium.connector.
sqlserver
.SchemaChangeValue"
))
{
if
(
valueSchema
.
name
().
equals
(
"io.debezium.connector.
oracle
.SchemaChangeValue"
))
{
return
false
;
}
return
true
;
...
...
@@ -272,25 +276,22 @@ public class DataSyncWriter implements RecordHandler {
if
(
value
==
null
)
{
return
null
;
}
if
(
fieldSchema
.
name
()
==
null
)
{
if
(
fieldSchema
.
type
()
==
Schema
.
Type
.
STRING
)
{
return
value
;
}
if
(
fieldSchema
.
type
()
==
Schema
.
Type
.
INT32
)
{
return
value
;
}
if
(
fieldSchema
.
type
()
==
Schema
.
Type
.
BYTES
)
{
return
value
;
if
(
fieldSchema
.
type
()
==
Schema
.
Type
.
STRUCT
)
{
Struct
structValue
=
(
Struct
)
value
;
if
(
fieldSchema
.
name
().
equals
(
VariableScaleDecimal
.
class
.
getName
()))
{
value
=
new
BigDecimal
(
new
BigInteger
(
structValue
.
getBytes
(
VariableScaleDecimal
.
VALUE_FIELD
)),
structValue
.
getInt32
(
VariableScaleDecimal
.
SCALE_FIELD
));
}
}
switch
(
fieldSchema
.
name
())
{
case
"org.apache.kafka.connect.data.Decimal"
:
return
value
;
case
"io.debezium.time.Timestamp"
:
return
new
Timestamp
((
Long
)
value
);
default
:
throw
new
IllegalStateException
(
"暂不支持这种数据类型的字段:"
+
fieldSchema
.
name
());
if
(
fieldSchema
.
type
()
==
Schema
.
Type
.
INT64
)
{
Long
longValue
=
(
Long
)
value
;
if
(
fieldSchema
.
name
().
equals
(
io
.
debezium
.
time
.
Timestamp
.
class
.
getName
()))
{
value
=
LocalDateTime
.
ofInstant
(
Instant
.
ofEpochMilli
(
longValue
),
ZoneOffset
.
UTC
);
}
}
return
value
;
}
private
static
String
buildInsertSql
(
TableMetadata
tableMetadata
)
{
...
...
src/main/resources/debezium.properties
View file @
d060d162
name
=
engine
connector.class
=
io.debezium.connector.
sqlserver.SqlServer
Connector
connector.class
=
io.debezium.connector.
oracle.Oracle
Connector
database.history
=
io.debezium.relational.history.FileDatabaseHistory
snapshot.mode
=
schema_only
database.server.name
=
server1
offset.storage.file.filename
=
offset.dat
database.history.file.filename
=
dbhistory.dat
log.mining.strategy
=
online_catalog
database.history.skip.unparseable.ddl
=
true
database.history.store.only.captured.tables.ddl
=
true
\ No newline at end of file
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment