in flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java [730:812]
private void informAboutUnknownTableIfRequired(
MySqlPartition partition,
MySqlOffsetContext offsetContext,
Event event,
TableId tableId,
Operation operation)
throws InterruptedException {
if (tableId != null
&& connectorConfig.getTableFilters().dataCollectionFilter().isIncluded(tableId)) {
metrics.onErroneousEvent(
partition, "source = " + tableId + ", event " + event, operation);
EventHeaderV4 eventHeader = event.getHeader();
if (inconsistentSchemaHandlingMode == EventProcessingFailureHandlingMode.FAIL) {
LOGGER.error(
"Encountered change event '{}' at offset {} for table {} whose schema isn't known to this connector. One possible cause is an incomplete database history topic. Take a new snapshot in this case.{}"
+ "Use the mysqlbinlog tool to view the problematic event: mysqlbinlog --start-position={} --stop-position={} --verbose {}",
event,
offsetContext.getOffset(),
tableId,
System.lineSeparator(),
eventHeader.getPosition(),
eventHeader.getNextPosition(),
offsetContext.getSource().binlogFilename());
throw new DebeziumException(
"Encountered change event for table "
+ tableId
+ " whose schema isn't known to this connector");
} else if (inconsistentSchemaHandlingMode == EventProcessingFailureHandlingMode.WARN) {
LOGGER.warn(
"Encountered change event '{}' at offset {} for table {} whose schema isn't known to this connector. One possible cause is an incomplete database history topic. Take a new snapshot in this case.{}"
+ "The event will be ignored.{}"
+ "Use the mysqlbinlog tool to view the problematic event: mysqlbinlog --start-position={} --stop-position={} --verbose {}",
event,
offsetContext.getOffset(),
tableId,
System.lineSeparator(),
System.lineSeparator(),
eventHeader.getPosition(),
eventHeader.getNextPosition(),
offsetContext.getSource().binlogFilename());
} else {
LOGGER.debug(
"Encountered change event '{}' at offset {} for table {} whose schema isn't known to this connector. One possible cause is an incomplete database history topic. Take a new snapshot in this case.{}"
+ "The event will be ignored.{}"
+ "Use the mysqlbinlog tool to view the problematic event: mysqlbinlog --start-position={} --stop-position={} --verbose {}",
event,
offsetContext.getOffset(),
tableId,
System.lineSeparator(),
System.lineSeparator(),
eventHeader.getPosition(),
eventHeader.getNextPosition(),
offsetContext.getSource().binlogFilename());
}
} else {
if (tableId == null) {
EventData eventData = unwrapData(event);
if (eventData instanceof WriteRowsEventData) {
tableId =
taskContext
.getSchema()
.getExcludeTableId(
((WriteRowsEventData) eventData).getTableId());
} else if (eventData instanceof UpdateRowsEventData) {
tableId =
taskContext
.getSchema()
.getExcludeTableId(
((UpdateRowsEventData) eventData).getTableId());
} else if (eventData instanceof DeleteRowsEventData) {
tableId =
taskContext
.getSchema()
.getExcludeTableId(
((DeleteRowsEventData) eventData).getTableId());
}
}
LOGGER.trace("Filtered {} event for {}", event.getHeader().getEventType(), tableId);
metrics.onFilteredEvent(partition, "source = " + tableId, operation);
eventDispatcher.dispatchFilteredEvent(partition, offsetContext);
}
}