Commit 5fe6d122 by 成浩

增加针对oracle 西语环境捕获中文乱码的处理方法

parent 2f93df9e
package com.tbyf.cdcengine2.oracle;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import io.debezium.connector.common.OffsetReader;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.annotation.SingleThreadAccess;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.pipeline.ChangeEventSourceCoordinator;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Offsets;
import io.debezium.pipeline.spi.Partition;
import io.debezium.util.Clock;
import io.debezium.util.ElapsedTimeStrategy;
import io.debezium.util.Metronome;
import io.debezium.util.Strings;
/**
* Base class for Debezium's CDC {@link SourceTask} implementations. Provides functionality common to all connectors,
* such as validation of the configuration.
*
* @author Gunnar Morling
*/
public abstract class BaseSourceTask<P extends Partition, O extends OffsetContext> extends SourceTask {
private static final Logger LOGGER = LoggerFactory.getLogger(BaseSourceTask.class);
private static final long INITIAL_POLL_PERIOD_IN_MILLIS = TimeUnit.SECONDS.toMillis(5);
private static final long MAX_POLL_PERIOD_IN_MILLIS = TimeUnit.HOURS.toMillis(1);
protected static enum State {
RUNNING,
STOPPED;
}
private final AtomicReference<State> state = new AtomicReference<State>(State.STOPPED);
/**
* Used to ensure that start(), stop() and commitRecord() calls are serialized.
*/
private final ReentrantLock stateLock = new ReentrantLock();
private volatile ElapsedTimeStrategy restartDelay;
/**
* Raw connector properties, kept here so they can be passed again in case of a restart.
*/
private volatile Map<String, String> props;
/**
* The change event source coordinator for those connectors adhering to the new
* framework structure, {@code null} for legacy-style connectors.
*/
private ChangeEventSourceCoordinator<P, O> coordinator;
/**
* The latest offset that has been acknowledged by the Kafka producer. Will be
* acknowledged with the source database in {@link BaseSourceTask#commit()}
* (which may be a no-op depending on the connector).
*/
private volatile Map<String, ?> lastOffset;
private Duration retriableRestartWait;
private final ElapsedTimeStrategy pollOutputDelay;
private final Clock clock = Clock.system();
@SingleThreadAccess("polling thread")
private Instant previousOutputInstant;
protected BaseSourceTask() {
// Use exponential delay to log the progress frequently at first, but the quickly tapering off to once an hour...
pollOutputDelay = ElapsedTimeStrategy.exponential(clock, INITIAL_POLL_PERIOD_IN_MILLIS, MAX_POLL_PERIOD_IN_MILLIS);
// Initial our poll output delay logic ...
pollOutputDelay.hasElapsed();
previousOutputInstant = clock.currentTimeAsInstant();
}
@Override
public final void start(Map<String, String> props) {
if (context == null) {
throw new ConnectException("Unexpected null context");
}
stateLock.lock();
try {
if (!state.compareAndSet(State.STOPPED, State.RUNNING)) {
LOGGER.info("Connector has already been started");
return;
}
this.props = props;
Configuration config = Configuration.from(props);
retriableRestartWait = config.getDuration(CommonConnectorConfig.RETRIABLE_RESTART_WAIT, ChronoUnit.MILLIS);
// need to reset the delay or you only get one delayed restart
restartDelay = null;
if (!config.validateAndRecord(getAllConfigurationFields(), LOGGER::error)) {
throw new ConnectException("Error configuring an instance of " + getClass().getSimpleName() + "; check the logs for details");
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Starting {} with configuration:", getClass().getSimpleName());
config.withMaskedPasswords().forEach((propName, propValue) -> {
LOGGER.info(" {} = {}", propName, propValue);
});
}
this.coordinator = start(config);
}
finally {
stateLock.unlock();
}
}
/**
* Called once when starting this source task.
*
* @param config
* the task configuration; implementations should wrap it in a dedicated implementation of
* {@link CommonConnectorConfig} and work with typed access to configuration properties that way
*/
protected abstract ChangeEventSourceCoordinator<P, O> start(Configuration config);
@Override
public final List<SourceRecord> poll() throws InterruptedException {
boolean started = startIfNeededAndPossible();
// in backoff period after a retriable exception
if (!started) {
// WorkerSourceTask calls us immediately after we return the empty list.
// This turns into a throttling so we need to make a pause before we return
// the control back.
Metronome.parker(Duration.of(2, ChronoUnit.SECONDS), Clock.SYSTEM).pause();
return Collections.emptyList();
}
try {
final List<SourceRecord> records = doPoll();
logStatistics(records);
return records;
}
catch (RetriableException e) {
stop(true);
throw e;
}
}
void logStatistics(final List<SourceRecord> records) {
if (records == null || !LOGGER.isInfoEnabled()) {
return;
}
int batchSize = records.size();
if (batchSize > 0) {
SourceRecord lastRecord = records.get(batchSize - 1);
lastOffset = lastRecord.sourceOffset();
if (pollOutputDelay.hasElapsed()) {
// We want to record the status ...
final Instant currentTime = clock.currentTime();
LOGGER.info("{} records sent during previous {}, last recorded offset: {}", batchSize,
Strings.duration(Duration.between(previousOutputInstant, currentTime).toMillis()), lastOffset);
previousOutputInstant = currentTime;
}
}
}
/**
* Returns the next batch of source records, if any are available.
*/
protected abstract List<SourceRecord> doPoll() throws InterruptedException;
/**
* Starts this connector in case it has been stopped after a retriable error,
* and the backoff period has passed.
*/
private boolean startIfNeededAndPossible() {
stateLock.lock();
try {
if (state.get() == State.RUNNING) {
return true;
}
else if (restartDelay != null && restartDelay.hasElapsed()) {
start(props);
return true;
}
else {
LOGGER.info("Awaiting end of restart backoff period after a retriable error");
return false;
}
}
finally {
stateLock.unlock();
}
}
@Override
public final void stop() {
stop(false);
}
private void stop(boolean restart) {
stateLock.lock();
try {
if (!state.compareAndSet(State.RUNNING, State.STOPPED)) {
LOGGER.info("Connector has already been stopped");
return;
}
if (restart) {
LOGGER.warn("Going to restart connector after {} sec. after a retriable exception", retriableRestartWait.getSeconds());
}
else {
LOGGER.info("Stopping down connector");
}
try {
if (coordinator != null) {
coordinator.stop();
}
}
catch (InterruptedException e) {
Thread.interrupted();
LOGGER.error("Interrupted while stopping coordinator", e);
throw new ConnectException("Interrupted while stopping coordinator, failing the task");
}
doStop();
if (restart && restartDelay == null) {
restartDelay = ElapsedTimeStrategy.constant(Clock.system(), retriableRestartWait.toMillis());
restartDelay.hasElapsed();
}
}
finally {
stateLock.unlock();
}
}
protected abstract void doStop();
@Override
public void commitRecord(SourceRecord record) throws InterruptedException {
Map<String, ?> currentOffset = record.sourceOffset();
if (currentOffset != null) {
this.lastOffset = currentOffset;
}
}
@Override
public void commit() throws InterruptedException {
boolean locked = stateLock.tryLock();
if (locked) {
try {
if (coordinator != null && lastOffset != null) {
coordinator.commitOffset(lastOffset);
}
}
finally {
stateLock.unlock();
}
}
else {
LOGGER.warn("Couldn't commit processed log positions with the source database due to a concurrent connector shutdown or restart");
}
}
/**
* Returns all configuration {@link Field} supported by this source task.
*/
protected abstract Iterable<Field> getAllConfigurationFields();
/**
* Loads the connector's persistent offsets (if present) via the given loader.
*/
protected Offsets<P, O> getPreviousOffsets(Partition.Provider<P> provider, OffsetContext.Loader<O> loader) {
Set<P> partitions = provider.getPartitions();
OffsetReader<P, O, OffsetContext.Loader<O>> reader = new OffsetReader<>(
context.offsetStorageReader(), loader);
Map<P, O> offsets = reader.offsets(partitions);
boolean found = false;
for (P partition : partitions) {
O offset = offsets.get(partition);
if (offset != null) {
found = true;
LOGGER.info("Found previous partition offset {}: {}", partition, offset.getOffset());
}
}
if (!found) {
LOGGER.info("No previous offsets found");
}
return Offsets.of(offsets);
}
}
package com.tbyf.cdcengine2.oracle;
import io.debezium.config.Configuration;
import io.debezium.connector.common.RelationalBaseSourceConnector;
import io.debezium.connector.oracle.Module;
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleConnectorTask;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.SQLException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class CustomOracleConnector extends RelationalBaseSourceConnector {
private static final Logger LOGGER = LoggerFactory.getLogger(CustomOracleConnector.class);
private Map<String, String> properties;
@Override
public String version() {
return Module.version();
}
@Override
public void start(Map<String, String> props) {
this.properties = Collections.unmodifiableMap(new HashMap<>(props));
}
@Override
public Class<? extends Task> taskClass() {
return CustomOracleConnectorTask.class;
}
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
if (maxTasks > 1) {
throw new IllegalArgumentException("Only a single connector task may be started");
}
return Collections.singletonList(properties);
}
@Override
public void stop() {
}
@Override
public ConfigDef config() {
return OracleConnectorConfig.configDef();
}
@Override
protected void validateConnection(Map<String, ConfigValue> configValues, Configuration config) {
final ConfigValue databaseValue = configValues.get(RelationalDatabaseConnectorConfig.DATABASE_NAME.name());
if (!databaseValue.errorMessages().isEmpty()) {
return;
}
final ConfigValue hostnameValue = configValues.get(RelationalDatabaseConnectorConfig.HOSTNAME.name());
final ConfigValue userValue = configValues.get(RelationalDatabaseConnectorConfig.USER.name());
OracleConnectorConfig connectorConfig = new OracleConnectorConfig(config);
try (OracleConnection connection = new OracleConnection(connectorConfig.getJdbcConfig(), () -> getClass().getClassLoader())) {
LOGGER.debug("Successfully tested connection for {} with user '{}'", OracleConnection.connectionString(connectorConfig.getJdbcConfig()),
connection.username());
}
catch (SQLException | RuntimeException e) {
LOGGER.error("Failed testing connection for {} with user '{}'", config.withMaskedPasswords(), userValue, e);
hostnameValue.addErrorMessage("Unable to connect: " + e.getMessage());
}
}
@Override
protected Map<String, ConfigValue> validateAllFields(Configuration config) {
return config.validate(OracleConnectorConfig.ALL_FIELDS);
}
}
package com.tbyf.cdcengine2.oracle;
import io.debezium.DebeziumException;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.oracle.*;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.pipeline.ChangeEventSourceCoordinator;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.spi.Offsets;
import io.debezium.relational.TableId;
import io.debezium.schema.TopicSelector;
import io.debezium.util.Clock;
import io.debezium.util.SchemaNameAdjuster;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.SQLException;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
public class CustomOracleConnectorTask extends BaseSourceTask<OraclePartition, OracleOffsetContext> {
private static final Logger LOGGER = LoggerFactory.getLogger(CustomOracleConnectorTask.class);
private static final String CONTEXT_NAME = "custome-oracle-connector-task";
private volatile OracleTaskContext taskContext;
private volatile ChangeEventQueue<DataChangeEvent> queue;
private volatile OracleConnection jdbcConnection;
private volatile ErrorHandler errorHandler;
private volatile OracleDatabaseSchema schema;
@Override
public String version() {
return Module.version();
}
@Override
public ChangeEventSourceCoordinator<OraclePartition, OracleOffsetContext> start(Configuration config) {
OracleConnectorConfig connectorConfig = new OracleConnectorConfig(config);
TopicSelector<TableId> topicSelector = OracleTopicSelector.defaultSelector(connectorConfig);
SchemaNameAdjuster schemaNameAdjuster = connectorConfig.schemaNameAdjustmentMode().createAdjuster();
JdbcConfiguration jdbcConfig = connectorConfig.getJdbcConfig();
jdbcConfig = JdbcConfiguration.adapt(jdbcConfig.edit()
.with(JdbcConfiguration.CONNECTION_FACTORY_CLASS, config.getString(JdbcConfiguration.CONNECTION_FACTORY_CLASS))
.build());
//将上面代码重新赋值给jdbcConfig变量
jdbcConnection = new OracleConnection(jdbcConfig, () -> getClass().getClassLoader());
validateRedoLogConfiguration();
OracleValueConverters valueConverters = new OracleValueConverters(connectorConfig, jdbcConnection);
OracleDefaultValueConverter defaultValueConverter = new OracleDefaultValueConverter(valueConverters, jdbcConnection);
StreamingAdapter.TableNameCaseSensitivity tableNameCaseSensitivity = connectorConfig.getAdapter().getTableNameCaseSensitivity(jdbcConnection);
this.schema = new OracleDatabaseSchema(connectorConfig, valueConverters, defaultValueConverter, schemaNameAdjuster,
topicSelector, tableNameCaseSensitivity);
Offsets<OraclePartition, OracleOffsetContext> previousOffsets = getPreviousOffsets(new Provider(connectorConfig),
connectorConfig.getAdapter().getOffsetContextLoader());
OraclePartition partition = previousOffsets.getTheOnlyPartition();
OracleOffsetContext previousOffset = previousOffsets.getTheOnlyOffset();
validateAndLoadDatabaseHistory(connectorConfig, partition, previousOffset, schema);
taskContext = new OracleTaskContext(connectorConfig, schema);
Clock clock = Clock.system();
// Set up the task record queue ...
this.queue = new ChangeEventQueue.Builder<DataChangeEvent>()
.pollInterval(connectorConfig.getPollInterval())
.maxBatchSize(connectorConfig.getMaxBatchSize())
.maxQueueSize(connectorConfig.getMaxQueueSize())
.loggingContextSupplier(() -> taskContext.configureLoggingContext(CONTEXT_NAME))
.build();
errorHandler = new OracleErrorHandler(connectorConfig, queue);
final OracleEventMetadataProvider metadataProvider = new OracleEventMetadataProvider();
EventDispatcher<OraclePartition, TableId> dispatcher = new EventDispatcher<>(
connectorConfig,
topicSelector,
schema,
queue,
connectorConfig.getTableFilters().dataCollectionFilter(),
DataChangeEvent::new,
metadataProvider,
schemaNameAdjuster);
final OracleStreamingChangeEventSourceMetrics streamingMetrics = new OracleStreamingChangeEventSourceMetrics(taskContext, queue, metadataProvider,
connectorConfig);
ChangeEventSourceCoordinator<OraclePartition, OracleOffsetContext> coordinator = new ChangeEventSourceCoordinator<>(
previousOffsets,
errorHandler,
OracleConnector.class,
connectorConfig,
new OracleChangeEventSourceFactory(connectorConfig, jdbcConnection, errorHandler, dispatcher, clock, schema, jdbcConfig, taskContext, streamingMetrics),
new OracleChangeEventSourceMetricsFactory(streamingMetrics),
dispatcher,
schema);
coordinator.start(taskContext, this.queue, metadataProvider);
return coordinator;
}
@Override
public List<SourceRecord> doPoll() throws InterruptedException {
List<DataChangeEvent> records = queue.poll();
List<SourceRecord> sourceRecords = records.stream()
.map(DataChangeEvent::getRecord)
.collect(Collectors.toList());
return sourceRecords;
}
@Override
public void doStop() {
try {
if (jdbcConnection != null) {
jdbcConnection.close();
}
}
catch (SQLException e) {
LOGGER.error("Exception while closing JDBC connection", e);
}
schema.close();
}
@Override
protected Iterable<Field> getAllConfigurationFields() {
return OracleConnectorConfig.ALL_FIELDS;
}
private void validateRedoLogConfiguration() {
// Check whether the archive log is enabled.
// final boolean archivelogMode = jdbcConnection.isArchiveLogMode();
// if (!archivelogMode) {
// throw new DebeziumException("The Oracle server is not configured to use a archive log LOG_MODE, which is "
// + "required for this connector to work properly. Change the Oracle configuration to use a "
// + "LOG_MODE=ARCHIVELOG and restart the connector.");
// }
}
private void validateAndLoadDatabaseHistory(OracleConnectorConfig config, OraclePartition partition, OracleOffsetContext offset, OracleDatabaseSchema schema) {
if (offset == null) {
if (config.getSnapshotMode().shouldSnapshotOnSchemaError()) {
// We are in schema only recovery mode, use the existing redo log position
// would like to also verify redo log position exists, but it defaults to 0 which is technically valid
throw new DebeziumException("Could not find existing redo log information while attempting schema only recovery snapshot");
}
LOGGER.info("Connector started for the first time, database history recovery will not be executed");
schema.initializeStorage();
return;
}
if (!schema.historyExists()) {
LOGGER.warn("Database history was not found but was expected");
if (config.getSnapshotMode().shouldSnapshotOnSchemaError()) {
LOGGER.info("The db-history topic is missing but we are in {} snapshot mode. " +
"Attempting to snapshot the current schema and then begin reading the redo log from the last recorded offset.",
OracleConnectorConfig.SnapshotMode.SCHEMA_ONLY_RECOVERY);
}
else {
throw new DebeziumException("The db history topic is missing. You may attempt to recover it by reconfiguring the connector to "
+ OracleConnectorConfig.SnapshotMode.SCHEMA_ONLY_RECOVERY);
}
schema.initializeStorage();
return;
}
schema.recover(Offsets.of(partition, offset));
}
}
......@@ -18,9 +18,11 @@ public class OracleCdcEngine extends AbstractCdcEngine<OracleCdcEngine> {
public OracleCdcEngine(OracleCdcEngineProperties props) {
super(props);
debeziumProps.setProperty(Constants.CONNECTOR_CLASS_PROP, OracleConnector.class.getName());
debeziumProps.setProperty(Constants.CONNECTOR_CLASS_PROP, CustomOracleConnector.class.getName());
debeziumProps.setProperty(Constants.DATABASE_DBNAME_PROP, props.getDbname());
debeziumProps.setProperty("log.mining.strategy", "online_catalog");
//要实现针对西语环境的oracle日志乱码处理
// debeziumProps.setProperty("connection.factory.class", "com.tbyf.hip.cloud.service.config.CustomConnectionFactory");
String schema = props.getUsername().toUpperCase();
if (props.getSchemaList()!=null){
......
package com.tbyf.cdcengine2.oracle;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.schema.DataCollectionId;
import io.debezium.util.Collect;
import org.apache.kafka.connect.data.Struct;
import java.time.Instant;
import java.util.Map;
class OracleEventMetadataProvider implements EventMetadataProvider {
OracleEventMetadataProvider() {
}
public Instant getEventTimestamp(DataCollectionId source, OffsetContext offset, Object key, Struct value) {
if (value == null) {
return null;
} else {
Struct sourceInfo = value.getStruct("source");
if (source == null) {
return null;
} else {
Long timestamp = sourceInfo.getInt64("ts_ms");
return timestamp == null ? null : Instant.ofEpochMilli(timestamp);
}
}
}
public Map<String, String> getEventSourcePosition(DataCollectionId source, OffsetContext offset, Object key, Struct value) {
if (value == null) {
return null;
} else {
Struct sourceInfo = value.getStruct("source");
if (source == null) {
return null;
} else {
String scn = sourceInfo.getString("scn");
return Collect.hashMapOf("scn", scn == null ? "null" : scn);
}
}
}
public String getTransactionId(DataCollectionId source, OffsetContext offset, Object key, Struct value) {
if (value == null) {
return null;
} else {
Struct sourceInfo = value.getStruct("source");
return source == null ? null : sourceInfo.getString("txId");
}
}
}
package com.tbyf.cdcengine2.oracle;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OraclePartition;
import io.debezium.pipeline.spi.Partition;
import java.util.Collections;
import java.util.Set;
public class Provider implements Partition.Provider<OraclePartition> {
private final OracleConnectorConfig connectorConfig;
Provider(OracleConnectorConfig connectorConfig) {
this.connectorConfig = connectorConfig;
}
public Set<OraclePartition> getPartitions() {
return Collections.singleton(new OraclePartition(this.connectorConfig.getLogicalName()));
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment