in rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventProcessor.java [110:155]
private void doProcess() {
while (true) {
try {
Event event = queue.poll(1000, TimeUnit.MILLISECONDS);
if (event == null) {
checkConnection();
continue;
}
switch (event.getHeader().getEventType()) {
case TABLE_MAP:
processTableMapEvent(event);
break;
case WRITE_ROWS:
case EXT_WRITE_ROWS:
processWriteEvent(event);
break;
case UPDATE_ROWS:
case EXT_UPDATE_ROWS:
processUpdateEvent(event);
break;
case DELETE_ROWS:
case EXT_DELETE_ROWS:
processDeleteEvent(event);
break;
case QUERY:
processQueryEvent(event);
break;
case XID:
processXidEvent(event);
break;
}
} catch (Exception e) {
LOGGER.error("Binlog process error.", e);
}
}
}