private void informAboutUnknownTableIfRequired()

in flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java [730:812]


    private void informAboutUnknownTableIfRequired(
            MySqlPartition partition,
            MySqlOffsetContext offsetContext,
            Event event,
            TableId tableId,
            Operation operation)
            throws InterruptedException {
        if (tableId != null
                && connectorConfig.getTableFilters().dataCollectionFilter().isIncluded(tableId)) {
            metrics.onErroneousEvent(
                    partition, "source = " + tableId + ", event " + event, operation);
            EventHeaderV4 eventHeader = event.getHeader();

            if (inconsistentSchemaHandlingMode == EventProcessingFailureHandlingMode.FAIL) {
                LOGGER.error(
                        "Encountered change event '{}' at offset {} for table {} whose schema isn't known to this connector. One possible cause is an incomplete database history topic. Take a new snapshot in this case.{}"
                                + "Use the mysqlbinlog tool to view the problematic event: mysqlbinlog --start-position={} --stop-position={} --verbose {}",
                        event,
                        offsetContext.getOffset(),
                        tableId,
                        System.lineSeparator(),
                        eventHeader.getPosition(),
                        eventHeader.getNextPosition(),
                        offsetContext.getSource().binlogFilename());
                throw new DebeziumException(
                        "Encountered change event for table "
                                + tableId
                                + " whose schema isn't known to this connector");
            } else if (inconsistentSchemaHandlingMode == EventProcessingFailureHandlingMode.WARN) {
                LOGGER.warn(
                        "Encountered change event '{}' at offset {} for table {} whose schema isn't known to this connector. One possible cause is an incomplete database history topic. Take a new snapshot in this case.{}"
                                + "The event will be ignored.{}"
                                + "Use the mysqlbinlog tool to view the problematic event: mysqlbinlog --start-position={} --stop-position={} --verbose {}",
                        event,
                        offsetContext.getOffset(),
                        tableId,
                        System.lineSeparator(),
                        System.lineSeparator(),
                        eventHeader.getPosition(),
                        eventHeader.getNextPosition(),
                        offsetContext.getSource().binlogFilename());
            } else {
                LOGGER.debug(
                        "Encountered change event '{}' at offset {} for table {} whose schema isn't known to this connector. One possible cause is an incomplete database history topic. Take a new snapshot in this case.{}"
                                + "The event will be ignored.{}"
                                + "Use the mysqlbinlog tool to view the problematic event: mysqlbinlog --start-position={} --stop-position={} --verbose {}",
                        event,
                        offsetContext.getOffset(),
                        tableId,
                        System.lineSeparator(),
                        System.lineSeparator(),
                        eventHeader.getPosition(),
                        eventHeader.getNextPosition(),
                        offsetContext.getSource().binlogFilename());
            }
        } else {
            if (tableId == null) {
                EventData eventData = unwrapData(event);
                if (eventData instanceof WriteRowsEventData) {
                    tableId =
                            taskContext
                                    .getSchema()
                                    .getExcludeTableId(
                                            ((WriteRowsEventData) eventData).getTableId());
                } else if (eventData instanceof UpdateRowsEventData) {
                    tableId =
                            taskContext
                                    .getSchema()
                                    .getExcludeTableId(
                                            ((UpdateRowsEventData) eventData).getTableId());
                } else if (eventData instanceof DeleteRowsEventData) {
                    tableId =
                            taskContext
                                    .getSchema()
                                    .getExcludeTableId(
                                            ((DeleteRowsEventData) eventData).getTableId());
                }
            }
            LOGGER.trace("Filtered {} event for {}", event.getHeader().getEventType(), tableId);
            metrics.onFilteredEvent(partition, "source = " + tableId, operation);
            eventDispatcher.dispatchFilteredEvent(partition, offsetContext);
        }
    }