public void execute()

in inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java [823:985]


    public void execute(ChangeEventSourceContext context) throws InterruptedException {
        if (!connectorConfig.getSnapshotMode().shouldStream()) {
            LOGGER.info("Streaming is disabled for snapshot mode {}", connectorConfig.getSnapshotMode());
            return;
        }
        taskContext.getSchema().assureNonEmptySchema();
        final Set<Operation> skippedOperations = connectorConfig.getSkippedOps();

        // Register our event handlers ...
        eventHandlers.put(EventType.STOP, this::handleServerStop);
        eventHandlers.put(EventType.HEARTBEAT, this::handleServerHeartbeat);
        eventHandlers.put(EventType.INCIDENT, this::handleServerIncident);
        eventHandlers.put(EventType.ROTATE, this::handleRotateLogsEvent);
        eventHandlers.put(EventType.TABLE_MAP, this::handleUpdateTableMetadata);
        eventHandlers.put(EventType.QUERY, this::handleQueryEvent);

        if (!skippedOperations.contains(Operation.CREATE)) {
            eventHandlers.put(EventType.WRITE_ROWS, this::handleInsert);
            eventHandlers.put(EventType.EXT_WRITE_ROWS, this::handleInsert);
        }

        if (!skippedOperations.contains(Operation.UPDATE)) {
            eventHandlers.put(EventType.UPDATE_ROWS, this::handleUpdate);
            eventHandlers.put(EventType.EXT_UPDATE_ROWS, this::handleUpdate);
        }

        if (!skippedOperations.contains(Operation.DELETE)) {
            eventHandlers.put(EventType.DELETE_ROWS, this::handleDelete);
            eventHandlers.put(EventType.EXT_DELETE_ROWS, this::handleDelete);
        }

        eventHandlers.put(EventType.VIEW_CHANGE, this::viewChange);
        eventHandlers.put(EventType.XA_PREPARE, this::prepareTransaction);
        eventHandlers.put(EventType.XID, this::handleTransactionCompletion);

        // Conditionally register ROWS_QUERY handler to parse SQL statements.
        if (connectorConfig.includeSqlQuery()) {
            eventHandlers.put(EventType.ROWS_QUERY, this::handleRowsQuery);
        }

        client.registerEventListener(connectorConfig.bufferSizeForStreamingChangeEventSource() == 0
                ? this::handleEvent
                : (new EventBuffer(connectorConfig.bufferSizeForStreamingChangeEventSource(), this, context))::add);

        client.registerLifecycleListener(new ReaderThreadLifecycleListener());
        client.registerEventListener(this::onEvent);
        if (LOGGER.isDebugEnabled()) {
            client.registerEventListener(this::logEvent);
        }

        final boolean isGtidModeEnabled = connection.isGtidModeEnabled();
        metrics.setIsGtidModeEnabled(isGtidModeEnabled);

        // Get the current GtidSet from MySQL so we can get a filtered/merged GtidSet based off of the last Debezium
        // checkpoint.
        String availableServerGtidStr = connection.knownGtidSet();
        if (isGtidModeEnabled) {
            // The server is using GTIDs, so enable the handler ...
            eventHandlers.put(EventType.GTID, this::handleGtidEvent);

            // Now look at the GTID set from the server and what we've previously seen ...
            GtidSet availableServerGtidSet = new GtidSet(availableServerGtidStr);

            // also take into account purged GTID logs
            GtidSet purgedServerGtidSet = connection.purgedGtidSet();
            LOGGER.info("GTID set purged on server: {}", purgedServerGtidSet);

            GtidSet filteredGtidSet = filterGtidSet(availableServerGtidSet, purgedServerGtidSet);
            if (filteredGtidSet != null) {
                // We've seen at least some GTIDs, so start reading from the filtered GTID set ...
                LOGGER.info("Registering binlog reader with GTID set: {}", filteredGtidSet);
                String filteredGtidSetStr = filteredGtidSet.toString();
                client.setGtidSet(filteredGtidSetStr);
                offsetContext.setCompletedGtidSet(filteredGtidSetStr);
                gtidSet = new com.github.shyiko.mysql.binlog.GtidSet(filteredGtidSetStr);
            } else {
                // We've not yet seen any GTIDs, so that means we have to start reading the binlog from the beginning
                // ...
                client.setBinlogFilename(offsetContext.getSource().binlogFilename());
                client.setBinlogPosition(offsetContext.getSource().binlogPosition());
                gtidSet = new com.github.shyiko.mysql.binlog.GtidSet("");
            }
        } else {
            // The server is not using GTIDs, so start reading the binlog based upon where we last left off ...
            client.setBinlogFilename(offsetContext.getSource().binlogFilename());
            client.setBinlogPosition(offsetContext.getSource().binlogPosition());
        }

        // We may be restarting in the middle of a transaction, so see how far into the transaction we have already
        // processed...
        initialEventsToSkip = offsetContext.eventsToSkipUponRestart();
        LOGGER.info("Skip {} events on streaming start", initialEventsToSkip);

        // Set the starting row number, which is the next row number to be read ...
        startingRowNumber = offsetContext.rowsToSkipUponRestart();
        LOGGER.info("Skip {} rows on streaming start", startingRowNumber);

        // Only when we reach the first BEGIN event will we start to skip events ...
        skipEvent = false;

        try {
            // Start the log reader, which starts background threads ...
            if (context.isRunning()) {
                long timeout = connectorConfig.getConnectionTimeout().toMillis();
                long started = clock.currentTimeInMillis();
                try {
                    LOGGER.debug("Attempting to establish binlog reader connection with timeout of {} ms", timeout);
                    client.connect(timeout);
                    // Need to wait for keepalive thread to be running, otherwise it can be left orphaned
                    // The problem is with timing. When the close is called too early after connect then
                    // the keepalive thread is not terminated
                    if (client.isKeepAlive()) {
                        LOGGER.info("Waiting for keepalive thread to start");
                        final Metronome metronome = Metronome.parker(Duration.ofMillis(100), clock);
                        int waitAttempts = 50;
                        boolean keepAliveThreadRunning = false;
                        while (!keepAliveThreadRunning && waitAttempts-- > 0) {
                            for (Thread t : binaryLogClientThreads.values()) {
                                if (t.getName().startsWith(KEEPALIVE_THREAD_NAME) && t.isAlive()) {
                                    LOGGER.info("Keepalive thread is running");
                                    keepAliveThreadRunning = true;
                                }
                            }
                            metronome.pause();
                        }
                    }
                } catch (TimeoutException e) {
                    // If the client thread is interrupted *before* the client could connect, the client throws a
                    // timeout exception
                    // The only way we can distinguish this is if we get the timeout exception before the specified
                    // timeout has
                    // elapsed, so we simply check this (within 10%) ...
                    long duration = clock.currentTimeInMillis() - started;
                    if (duration > (0.9 * timeout)) {
                        double actualSeconds = TimeUnit.MILLISECONDS.toSeconds(duration);
                        throw new DebeziumException(
                                "Timed out after " + actualSeconds + " seconds while waiting to connect to MySQL at " +
                                        connectorConfig.hostname() + ":" + connectorConfig.port() + " with user '"
                                        + connectorConfig.username() + "'",
                                e);
                    }
                    // Otherwise, we were told to shutdown, so we don't care about the timeout exception
                } catch (AuthenticationException e) {
                    throw new DebeziumException("Failed to authenticate to the MySQL database at " +
                            connectorConfig.hostname() + ":" + connectorConfig.port() + " with user '"
                            + connectorConfig.username() + "'", e);
                } catch (Throwable e) {
                    throw new DebeziumException("Unable to connect to the MySQL database at " +
                            connectorConfig.hostname() + ":" + connectorConfig.port() + " with user '"
                            + connectorConfig.username() + "': " + e.getMessage(), e);
                }
            }
            while (context.isRunning()) {
                Thread.sleep(100);
            }
        } finally {
            try {
                client.disconnect();
            } catch (Exception e) {
                LOGGER.info("Exception while stopping binary log client", e);
            }
        }
    }