Commit 56eb47c3 by y1sa

新增事件机制

parent 29f7a9ad
......@@ -70,7 +70,7 @@ public abstract class AbstractCdcEngine<T extends AbstractCdcEngine<T>> {
ChangedRecord record = recordQueue.take();
try {
for (ChangeListener listener : changeListeners) {
if (listener.supports(record)) {
if (listener.supports(record.schema(), record.table())) {
listener.onChange(record);
}
}
......@@ -81,7 +81,7 @@ public abstract class AbstractCdcEngine<T extends AbstractCdcEngine<T>> {
for (ChangedRecord record : buffer) {
try {
for (ChangeListener listener : changeListeners) {
if (listener.supports(record)) {
if (listener.supports(record.schema(), record.table())) {
listener.onChange(record);
}
}
......@@ -102,7 +102,7 @@ public abstract class AbstractCdcEngine<T extends AbstractCdcEngine<T>> {
for (ChangedRecord record : buffer) {
try {
for (ChangeListener listener : changeListeners) {
if (listener.supports(record)) {
if (listener.supports(record.schema(), record.table())) {
listener.onChange(record);
}
}
......
......@@ -2,9 +2,17 @@ package com.tbyf.cdcengine2.core;
public interface ChangeListener {
default boolean supports(ChangedRecord record) {
default boolean supportsSchema(String schema) {
return true;
}
default boolean supportsTable(String table) {
return true;
}
default boolean supports(String schema, String table) {
return supportsSchema(schema) && supportsTable(table);
}
void onChange(ChangedRecord record);
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment