Commit 49d964a3 by y1sa

新增失败回调

parent 56eb47c3
...@@ -41,7 +41,9 @@ public abstract class AbstractCdcEngine<T extends AbstractCdcEngine<T>> { ...@@ -41,7 +41,9 @@ public abstract class AbstractCdcEngine<T extends AbstractCdcEngine<T>> {
private volatile boolean running = true; private volatile boolean running = true;
private volatile List<ChangeListener> changeListeners = new ArrayList<>(); private ChangeHandler changeHandler;
private Runnable errorHandler;
public AbstractCdcEngine() { public AbstractCdcEngine() {
debeziumProps.setProperty(Constants.DATABASE_SERVER_NAME_PROP, Constants.DATABASE_SERVER_NAME_VALUE); debeziumProps.setProperty(Constants.DATABASE_SERVER_NAME_PROP, Constants.DATABASE_SERVER_NAME_VALUE);
...@@ -49,42 +51,47 @@ public abstract class AbstractCdcEngine<T extends AbstractCdcEngine<T>> { ...@@ -49,42 +51,47 @@ public abstract class AbstractCdcEngine<T extends AbstractCdcEngine<T>> {
debeziumProps.setProperty("name", getClass().getSimpleName()); debeziumProps.setProperty("name", getClass().getSimpleName());
} }
public void addListener(ChangeListener listener) { @SuppressWarnings("unchecked")
ArrayList<ChangeListener> changeListeners = new ArrayList<>(this.changeListeners); private T self() {
changeListeners.add(listener); return (T) this;
this.changeListeners = changeListeners; }
public T onChange(ChangeHandler changeHandler) {
if (changeHandler == null) {
throw new IllegalArgumentException("changeHandler must not be null");
}
this.changeHandler = changeHandler;
return self();
}
public T onError(Runnable errorHandler) {
this.errorHandler = errorHandler;
return self();
} }
public void start() { public void start() {
synchronized (startupStopMonitor) { synchronized (startupStopMonitor) {
logger.info("开始启动CDC引擎..."); logger.info("开始启动CDC引擎...");
if (this.changeHandler == null) {
throw new IllegalStateException("changeHandler not set");
}
this.running = true; this.running = true;
consumerThread = new Thread(() -> { consumerThread = new Thread(() -> {
List<ChangeListener> changeListeners = null;
while (running) { while (running) {
changeListeners = this.changeListeners;
try { try {
List<ChangedRecord> buffer = new ArrayList<>(); List<ChangedRecord> buffer = new ArrayList<>();
recordQueue.drainTo(buffer); recordQueue.drainTo(buffer);
if (buffer.isEmpty()) { if (buffer.isEmpty()) {
ChangedRecord record = recordQueue.take(); ChangedRecord record = recordQueue.take();
try { try {
for (ChangeListener listener : changeListeners) { this.changeHandler.handle(record);
if (listener.supports(record.schema(), record.table())) {
listener.onChange(record);
}
}
} catch (Exception e) { } catch (Exception e) {
logger.error("处理{}时发生了异常", record, e); logger.error("处理{}时发生了异常", record, e);
} }
} else { } else {
for (ChangedRecord record : buffer) { for (ChangedRecord record : buffer) {
try { try {
for (ChangeListener listener : changeListeners) { changeHandler.handle(record);
if (listener.supports(record.schema(), record.table())) {
listener.onChange(record);
}
}
} catch (Exception e) { } catch (Exception e) {
logger.error("处理{}时发生了异常", record, e); logger.error("处理{}时发生了异常", record, e);
} }
...@@ -98,14 +105,9 @@ public abstract class AbstractCdcEngine<T extends AbstractCdcEngine<T>> { ...@@ -98,14 +105,9 @@ public abstract class AbstractCdcEngine<T extends AbstractCdcEngine<T>> {
List<ChangedRecord> buffer = new ArrayList<>(); List<ChangedRecord> buffer = new ArrayList<>();
recordQueue.drainTo(buffer); recordQueue.drainTo(buffer);
if (!buffer.isEmpty()) { if (!buffer.isEmpty()) {
changeListeners = this.changeListeners;
for (ChangedRecord record : buffer) { for (ChangedRecord record : buffer) {
try { try {
for (ChangeListener listener : changeListeners) { changeHandler.handle(record);
if (listener.supports(record.schema(), record.table())) {
listener.onChange(record);
}
}
} catch (Exception e) { } catch (Exception e) {
logger.error("处理{}时发生了异常", record, e); logger.error("处理{}时发生了异常", record, e);
} }
...@@ -166,6 +168,11 @@ public abstract class AbstractCdcEngine<T extends AbstractCdcEngine<T>> { ...@@ -166,6 +168,11 @@ public abstract class AbstractCdcEngine<T extends AbstractCdcEngine<T>> {
consumerThread.interrupt(); consumerThread.interrupt();
} }
this.consumerThread = null; this.consumerThread = null;
Runnable errorHandler = this.errorHandler;
if (errorHandler != null) {
errorHandler.run();
}
} }
} }
......
package com.tbyf.cdcengine2.core;
public interface ChangeHandler {
void handle(ChangedRecord record);
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment