in flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java [595:682]
protected void handleQueryEvent(
MySqlPartition partition, MySqlOffsetContext offsetContext, Event event)
throws InterruptedException {
QueryEventData command = unwrapData(event);
LOGGER.debug("Received query command: {}", event);
String sql = command.getSql().trim();
if (sql.equalsIgnoreCase("BEGIN")) {
// We are starting a new transaction ...
offsetContext.startNextTransaction();
eventDispatcher.dispatchTransactionStartedEvent(
partition, offsetContext.getTransactionId(), offsetContext);
offsetContext.setBinlogThread(command.getThreadId());
if (initialEventsToSkip != 0) {
LOGGER.debug(
"Restarting partially-processed transaction; change events will not be created for the first {} events plus {} more rows in the next event",
initialEventsToSkip,
startingRowNumber);
// We are restarting, so we need to skip the events in this transaction that we
// processed previously...
skipEvent = true;
}
return;
}
if (sql.equalsIgnoreCase("COMMIT")) {
handleTransactionCompletion(partition, offsetContext, event);
return;
}
String upperCasedStatementBegin = Strings.getBegin(sql, 7).toUpperCase();
if (upperCasedStatementBegin.startsWith("XA ")) {
// This is an XA transaction, and we currently ignore these and do nothing ...
return;
}
if (connectorConfig.getDdlFilter().test(sql)) {
LOGGER.debug("DDL '{}' was filtered out of processing", sql);
return;
}
if (upperCasedStatementBegin.equals("INSERT ")
|| upperCasedStatementBegin.equals("UPDATE ")
|| upperCasedStatementBegin.equals("DELETE ")) {
LOGGER.warn(
"Received DML '"
+ sql
+ "' for processing, binlog probably contains events generated with statement or mixed based replication format");
return;
}
if (sql.equalsIgnoreCase("ROLLBACK")) {
// We have hit a ROLLBACK which is not supported
LOGGER.warn(
"Rollback statements cannot be handled without binlog buffering, the connector will fail. Please check '{}' to see how to enable buffering",
MySqlConnectorConfig.BUFFER_SIZE_FOR_BINLOG_READER.name());
}
final List<SchemaChangeEvent> schemaChangeEvents =
taskContext
.getSchema()
.parseStreamingDdl(
partition,
sql,
command.getDatabase(),
offsetContext,
clock.currentTimeAsInstant());
try {
for (SchemaChangeEvent schemaChangeEvent : schemaChangeEvents) {
if (taskContext.getSchema().skipSchemaChangeEvent(schemaChangeEvent)) {
continue;
}
final TableId tableId =
schemaChangeEvent.getTables().isEmpty()
? null
: schemaChangeEvent.getTables().iterator().next().id();
eventDispatcher.dispatchSchemaChangeEvent(
partition,
tableId,
(receiver) -> {
try {
receiver.schemaChangeEvent(schemaChangeEvent);
} catch (Exception e) {
throw new DebeziumException(e);
}
});
}
} catch (InterruptedException e) {
LOGGER.info("Processing interrupted");
}
}