protected void handleQueryEvent()

in flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java [595:682]


    protected void handleQueryEvent(
            MySqlPartition partition, MySqlOffsetContext offsetContext, Event event)
            throws InterruptedException {
        QueryEventData command = unwrapData(event);
        LOGGER.debug("Received query command: {}", event);
        String sql = command.getSql().trim();
        if (sql.equalsIgnoreCase("BEGIN")) {
            // We are starting a new transaction ...
            offsetContext.startNextTransaction();
            eventDispatcher.dispatchTransactionStartedEvent(
                    partition, offsetContext.getTransactionId(), offsetContext);
            offsetContext.setBinlogThread(command.getThreadId());
            if (initialEventsToSkip != 0) {
                LOGGER.debug(
                        "Restarting partially-processed transaction; change events will not be created for the first {} events plus {} more rows in the next event",
                        initialEventsToSkip,
                        startingRowNumber);
                // We are restarting, so we need to skip the events in this transaction that we
                // processed previously...
                skipEvent = true;
            }
            return;
        }
        if (sql.equalsIgnoreCase("COMMIT")) {
            handleTransactionCompletion(partition, offsetContext, event);
            return;
        }

        String upperCasedStatementBegin = Strings.getBegin(sql, 7).toUpperCase();

        if (upperCasedStatementBegin.startsWith("XA ")) {
            // This is an XA transaction, and we currently ignore these and do nothing ...
            return;
        }
        if (connectorConfig.getDdlFilter().test(sql)) {
            LOGGER.debug("DDL '{}' was filtered out of processing", sql);
            return;
        }
        if (upperCasedStatementBegin.equals("INSERT ")
                || upperCasedStatementBegin.equals("UPDATE ")
                || upperCasedStatementBegin.equals("DELETE ")) {
            LOGGER.warn(
                    "Received DML '"
                            + sql
                            + "' for processing, binlog probably contains events generated with statement or mixed based replication format");
            return;
        }
        if (sql.equalsIgnoreCase("ROLLBACK")) {
            // We have hit a ROLLBACK which is not supported
            LOGGER.warn(
                    "Rollback statements cannot be handled without binlog buffering, the connector will fail. Please check '{}' to see how to enable buffering",
                    MySqlConnectorConfig.BUFFER_SIZE_FOR_BINLOG_READER.name());
        }

        final List<SchemaChangeEvent> schemaChangeEvents =
                taskContext
                        .getSchema()
                        .parseStreamingDdl(
                                partition,
                                sql,
                                command.getDatabase(),
                                offsetContext,
                                clock.currentTimeAsInstant());
        try {
            for (SchemaChangeEvent schemaChangeEvent : schemaChangeEvents) {
                if (taskContext.getSchema().skipSchemaChangeEvent(schemaChangeEvent)) {
                    continue;
                }

                final TableId tableId =
                        schemaChangeEvent.getTables().isEmpty()
                                ? null
                                : schemaChangeEvent.getTables().iterator().next().id();
                eventDispatcher.dispatchSchemaChangeEvent(
                        partition,
                        tableId,
                        (receiver) -> {
                            try {
                                receiver.schemaChangeEvent(schemaChangeEvent);
                            } catch (Exception e) {
                                throw new DebeziumException(e);
                            }
                        });
            }
        } catch (InterruptedException e) {
            LOGGER.info("Processing interrupted");
        }
    }