public void execute()

in flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java [1015:1253]


    public void execute(
            ChangeEventSourceContext context,
            MySqlPartition partition,
            MySqlOffsetContext offsetContext)
            throws InterruptedException {
        if (!connectorConfig.getSnapshotMode().shouldStream()) {
            LOGGER.info(
                    "Streaming is disabled for snapshot mode {}",
                    connectorConfig.getSnapshotMode());
            return;
        }
        if (connectorConfig.getSnapshotMode() != MySqlConnectorConfig.SnapshotMode.NEVER) {
            taskContext.getSchema().assureNonEmptySchema();
        }
        final Set<Operation> skippedOperations = connectorConfig.getSkippedOperations();

        final MySqlOffsetContext effectiveOffsetContext =
                offsetContext != null ? offsetContext : MySqlOffsetContext.initial(connectorConfig);

        // Register our event handlers ...
        eventHandlers.put(
                EventType.STOP, (event) -> handleServerStop(effectiveOffsetContext, event));
        eventHandlers.put(
                EventType.HEARTBEAT,
                (event) -> handleServerHeartbeat(partition, effectiveOffsetContext, event));
        eventHandlers.put(
                EventType.INCIDENT,
                (event) -> handleServerIncident(partition, effectiveOffsetContext, event));
        eventHandlers.put(
                EventType.ROTATE, (event) -> handleRotateLogsEvent(effectiveOffsetContext, event));
        eventHandlers.put(
                EventType.TABLE_MAP,
                (event) -> handleUpdateTableMetadata(partition, effectiveOffsetContext, event));
        eventHandlers.put(
                EventType.QUERY,
                (event) -> handleQueryEvent(partition, effectiveOffsetContext, event));

        if (!skippedOperations.contains(Operation.CREATE)) {
            eventHandlers.put(
                    EventType.WRITE_ROWS,
                    (event) -> handleInsert(partition, effectiveOffsetContext, event));
            eventHandlers.put(
                    EventType.EXT_WRITE_ROWS,
                    (event) -> handleInsert(partition, effectiveOffsetContext, event));
        }

        if (!skippedOperations.contains(Operation.UPDATE)) {
            eventHandlers.put(
                    EventType.UPDATE_ROWS,
                    (event) -> handleUpdate(partition, effectiveOffsetContext, event));
            eventHandlers.put(
                    EventType.EXT_UPDATE_ROWS,
                    (event) -> handleUpdate(partition, effectiveOffsetContext, event));
        }

        if (!skippedOperations.contains(Operation.DELETE)) {
            eventHandlers.put(
                    EventType.DELETE_ROWS,
                    (event) -> handleDelete(partition, effectiveOffsetContext, event));
            eventHandlers.put(
                    EventType.EXT_DELETE_ROWS,
                    (event) -> handleDelete(partition, effectiveOffsetContext, event));
        }

        eventHandlers.put(
                EventType.VIEW_CHANGE, (event) -> viewChange(effectiveOffsetContext, event));
        eventHandlers.put(
                EventType.XA_PREPARE, (event) -> prepareTransaction(effectiveOffsetContext, event));
        eventHandlers.put(
                EventType.XID,
                (event) -> handleTransactionCompletion(partition, effectiveOffsetContext, event));

        // Conditionally register ROWS_QUERY handler to parse SQL statements.
        if (connectorConfig.includeSqlQuery()) {
            eventHandlers.put(
                    EventType.ROWS_QUERY,
                    (event) -> handleRowsQuery(effectiveOffsetContext, event));
        }

        BinaryLogClient.EventListener listener;
        if (connectorConfig.bufferSizeForStreamingChangeEventSource() == 0) {
            listener = (event) -> handleEvent(partition, effectiveOffsetContext, event);
        } else {
            EventBuffer buffer =
                    new EventBuffer(
                            connectorConfig.bufferSizeForStreamingChangeEventSource(),
                            this,
                            context);
            listener = (event) -> buffer.add(partition, effectiveOffsetContext, event);
        }
        client.registerEventListener(listener);

        client.registerLifecycleListener(new ReaderThreadLifecycleListener(effectiveOffsetContext));
        client.registerEventListener((event) -> onEvent(effectiveOffsetContext, event));
        if (LOGGER.isDebugEnabled()) {
            client.registerEventListener((event) -> logEvent(effectiveOffsetContext, event));
        }

        final boolean isGtidModeEnabled = connection.isGtidModeEnabled();
        metrics.setIsGtidModeEnabled(isGtidModeEnabled);

        // Get the current GtidSet from MySQL so we can get a filtered/merged GtidSet based off of
        // the last Debezium checkpoint.
        String availableServerGtidStr = connection.knownGtidSet();
        if (isGtidModeEnabled) {
            // The server is using GTIDs, so enable the handler ...
            eventHandlers.put(
                    EventType.GTID, (event) -> handleGtidEvent(effectiveOffsetContext, event));

            // Now look at the GTID set from the server and what we've previously seen ...
            GtidSet availableServerGtidSet = new GtidSet(availableServerGtidStr);

            // also take into account purged GTID logs
            GtidSet purgedServerGtidSet = connection.purgedGtidSet();
            LOGGER.info("GTID set purged on server: {}", purgedServerGtidSet);

            GtidSet filteredGtidSet =
                    filterGtidSet(
                            effectiveOffsetContext, availableServerGtidSet, purgedServerGtidSet);
            if (filteredGtidSet != null) {
                // We've seen at least some GTIDs, so start reading from the filtered GTID set ...
                LOGGER.info("Registering binlog reader with GTID set: {}", filteredGtidSet);
                String filteredGtidSetStr = filteredGtidSet.toString();
                client.setGtidSet(filteredGtidSetStr);
                effectiveOffsetContext.setCompletedGtidSet(filteredGtidSetStr);
                gtidSet = new com.github.shyiko.mysql.binlog.GtidSet(filteredGtidSetStr);
            } else {
                // We've not yet seen any GTIDs, so that means we have to start reading the binlog
                // from the beginning ...
                client.setBinlogFilename(effectiveOffsetContext.getSource().binlogFilename());
                client.setBinlogPosition(effectiveOffsetContext.getSource().binlogPosition());
                gtidSet = new com.github.shyiko.mysql.binlog.GtidSet("");
            }
        } else {
            // The server is not using GTIDs, so start reading the binlog based upon where we last
            // left off ...
            client.setBinlogFilename(effectiveOffsetContext.getSource().binlogFilename());
            client.setBinlogPosition(effectiveOffsetContext.getSource().binlogPosition());
        }

        // We may be restarting in the middle of a transaction, so see how far into the transaction
        // we have already processed...
        initialEventsToSkip = effectiveOffsetContext.eventsToSkipUponRestart();
        LOGGER.info("Skip {} events on streaming start", initialEventsToSkip);

        // Set the starting row number, which is the next row number to be read ...
        startingRowNumber = effectiveOffsetContext.rowsToSkipUponRestart();
        LOGGER.info("Skip {} rows on streaming start", startingRowNumber);

        // Only when we reach the first BEGIN event will we start to skip events ...
        skipEvent = false;

        try {
            // Start the log reader, which starts background threads ...
            if (context.isRunning()) {
                long timeout = connectorConfig.getConnectionTimeout().toMillis();
                long started = clock.currentTimeInMillis();
                try {
                    LOGGER.debug(
                            "Attempting to establish binlog reader connection with timeout of {} ms",
                            timeout);
                    client.connect(timeout);
                    // Need to wait for keepalive thread to be running, otherwise it can be left
                    // orphaned
                    // The problem is with timing. When the close is called too early after connect
                    // then
                    // the keepalive thread is not terminated
                    if (client.isKeepAlive()) {
                        LOGGER.info("Waiting for keepalive thread to start");
                        final Metronome metronome = Metronome.parker(Duration.ofMillis(100), clock);
                        int waitAttempts = 50;
                        boolean keepAliveThreadRunning = false;
                        while (!keepAliveThreadRunning && waitAttempts-- > 0) {
                            for (Thread t : binaryLogClientThreads.values()) {
                                if (t.getName().startsWith(KEEPALIVE_THREAD_NAME) && t.isAlive()) {
                                    LOGGER.info("Keepalive thread is running");
                                    keepAliveThreadRunning = true;
                                }
                            }
                            metronome.pause();
                        }
                    }
                } catch (TimeoutException e) {
                    // If the client thread is interrupted *before* the client could connect, the
                    // client throws a timeout exception
                    // The only way we can distinguish this is if we get the timeout exception
                    // before the specified timeout has
                    // elapsed, so we simply check this (within 10%) ...
                    long duration = clock.currentTimeInMillis() - started;
                    if (duration > (0.9 * timeout)) {
                        double actualSeconds = TimeUnit.MILLISECONDS.toSeconds(duration);
                        throw new DebeziumException(
                                "Timed out after "
                                        + actualSeconds
                                        + " seconds while waiting to connect to MySQL at "
                                        + connectorConfig.hostname()
                                        + ":"
                                        + connectorConfig.port()
                                        + " with user '"
                                        + connectorConfig.username()
                                        + "'",
                                e);
                    }
                    // Otherwise, we were told to shutdown, so we don't care about the timeout
                    // exception
                } catch (AuthenticationException e) {
                    throw new DebeziumException(
                            "Failed to authenticate to the MySQL database at "
                                    + connectorConfig.hostname()
                                    + ":"
                                    + connectorConfig.port()
                                    + " with user '"
                                    + connectorConfig.username()
                                    + "'",
                            e);
                } catch (Throwable e) {
                    throw new DebeziumException(
                            "Unable to connect to the MySQL database at "
                                    + connectorConfig.hostname()
                                    + ":"
                                    + connectorConfig.port()
                                    + " with user '"
                                    + connectorConfig.username()
                                    + "': "
                                    + e.getMessage(),
                            e);
                }
            }
            while (context.isRunning()) {
                Thread.sleep(100);
            }
        } finally {
            try {
                client.disconnect();
            } catch (Exception e) {
                LOGGER.info("Exception while stopping binary log client", e);
            }
        }
    }