public void execute()

in flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/io/debezium/connector/db2/Db2StreamingChangeEventSource.java [109:357]


    public void execute(
            ChangeEventSourceContext context,
            Db2Partition partition,
            Db2OffsetContext offsetContext)
            throws InterruptedException {
        if (!connectorConfig.getSnapshotMode().shouldStream()) {
            LOGGER.info("Streaming is not enabled in current configuration");
            return;
        }

        final Metronome metronome = Metronome.sleeper(pollInterval, clock);
        final Queue<Db2ChangeTable> schemaChangeCheckpoints =
                new PriorityQueue<>((x, y) -> x.getStopLsn().compareTo(y.getStopLsn()));
        try {
            final AtomicReference<Db2ChangeTable[]> tablesSlot =
                    new AtomicReference<>(getCdcTablesToQuery(partition, offsetContext));

            final TxLogPosition lastProcessedPositionOnStart = offsetContext.getChangePosition();
            final long lastProcessedEventSerialNoOnStart = offsetContext.getEventSerialNo();
            LOGGER.info(
                    "Last position recorded in offsets is {}[{}]",
                    lastProcessedPositionOnStart,
                    lastProcessedEventSerialNoOnStart);

            TxLogPosition lastProcessedPosition = lastProcessedPositionOnStart;

            // LSN should be increased for the first run only immediately after snapshot completion
            // otherwise we might skip an incomplete transaction after restart
            boolean shouldIncreaseFromLsn = offsetContext.isSnapshotCompleted();
            while (context.isRunning()) {
                final Lsn currentMaxLsn = dataConnection.getMaxLsn();

                // Shouldn't happen if the agent is running, but it is better to guard against such
                // situation
                if (!currentMaxLsn.isAvailable()) {
                    LOGGER.warn(
                            "No maximum LSN recorded in the database; please ensure that the DB2 Agent is running");
                    metronome.pause();
                    continue;
                }
                // There is no change in the database
                if (currentMaxLsn.equals(lastProcessedPosition.getCommitLsn())
                        && shouldIncreaseFromLsn) {
                    LOGGER.debug("No change in the database");
                    metronome.pause();
                    continue;
                }

                // Reading interval is inclusive so we need to move LSN forward but not for first
                // run as TX might not be streamed completely
                final Lsn fromLsn =
                        lastProcessedPosition.getCommitLsn().isAvailable() && shouldIncreaseFromLsn
                                ? dataConnection.incrementLsn(lastProcessedPosition.getCommitLsn())
                                : lastProcessedPosition.getCommitLsn();
                shouldIncreaseFromLsn = true;

                while (!schemaChangeCheckpoints.isEmpty()) {
                    migrateTable(partition, offsetContext, schemaChangeCheckpoints);
                }
                if (!dataConnection.listOfNewChangeTables(fromLsn, currentMaxLsn).isEmpty()) {
                    final Db2ChangeTable[] tables = getCdcTablesToQuery(partition, offsetContext);
                    tablesSlot.set(tables);
                    for (Db2ChangeTable table : tables) {
                        if (table.getStartLsn().isBetween(fromLsn, currentMaxLsn)) {
                            LOGGER.info("Schema will be changed for {}", table);
                            schemaChangeCheckpoints.add(table);
                        }
                    }
                }
                try {
                    dataConnection.getChangesForTables(
                            tablesSlot.get(),
                            fromLsn,
                            currentMaxLsn,
                            resultSets -> {
                                long eventSerialNoInInitialTx = 1;
                                final int tableCount = resultSets.length;
                                final ChangeTablePointer[] changeTables =
                                        new ChangeTablePointer[tableCount];
                                final Db2ChangeTable[] tables = tablesSlot.get();

                                for (int i = 0; i < tableCount; i++) {
                                    changeTables[i] =
                                            new ChangeTablePointer(tables[i], resultSets[i]);
                                    changeTables[i].next();
                                }

                                for (; ; ) {
                                    ChangeTablePointer tableWithSmallestLsn = null;
                                    for (ChangeTablePointer changeTable : changeTables) {
                                        if (changeTable.isCompleted()) {
                                            continue;
                                        }
                                        if (tableWithSmallestLsn == null
                                                || changeTable.compareTo(tableWithSmallestLsn)
                                                        < 0) {
                                            tableWithSmallestLsn = changeTable;
                                        }
                                    }
                                    if (tableWithSmallestLsn == null) {
                                        // No more LSNs available
                                        break;
                                    }

                                    if (!(tableWithSmallestLsn.getChangePosition().isAvailable()
                                            && tableWithSmallestLsn
                                                    .getChangePosition()
                                                    .getInTxLsn()
                                                    .isAvailable())) {
                                        LOGGER.error(
                                                "Skipping change {} as its LSN is NULL which is not expected",
                                                tableWithSmallestLsn);
                                        tableWithSmallestLsn.next();
                                        continue;
                                    }
                                    // After restart for changes that were executed before the last
                                    // committed offset
                                    if (tableWithSmallestLsn
                                                    .getChangePosition()
                                                    .compareTo(lastProcessedPositionOnStart)
                                            < 0) {
                                        LOGGER.info(
                                                "Skipping change {} as its position is smaller than the last recorded position {}",
                                                tableWithSmallestLsn,
                                                lastProcessedPositionOnStart);
                                        tableWithSmallestLsn.next();
                                        continue;
                                    }
                                    // After restart for change that was the last committed and
                                    // operations in it before the last committed offset
                                    if (tableWithSmallestLsn
                                                            .getChangePosition()
                                                            .compareTo(lastProcessedPositionOnStart)
                                                    == 0
                                            && eventSerialNoInInitialTx
                                                    <= lastProcessedEventSerialNoOnStart) {
                                        LOGGER.info(
                                                "Skipping change {} as its order in the transaction {} is smaller than or equal to the last recorded operation {}[{}]",
                                                tableWithSmallestLsn,
                                                eventSerialNoInInitialTx,
                                                lastProcessedPositionOnStart,
                                                lastProcessedEventSerialNoOnStart);
                                        eventSerialNoInInitialTx++;
                                        tableWithSmallestLsn.next();
                                        continue;
                                    }
                                    if (tableWithSmallestLsn
                                                    .getChangeTable()
                                                    .getStopLsn()
                                                    .isAvailable()
                                            && tableWithSmallestLsn
                                                            .getChangeTable()
                                                            .getStopLsn()
                                                            .compareTo(
                                                                    tableWithSmallestLsn
                                                                            .getChangePosition()
                                                                            .getCommitLsn())
                                                    <= 0) {
                                        LOGGER.debug(
                                                "Skipping table change {} as its stop LSN is smaller than the last recorded LSN {}",
                                                tableWithSmallestLsn,
                                                tableWithSmallestLsn.getChangePosition());
                                        tableWithSmallestLsn.next();
                                        continue;
                                    }
                                    LOGGER.trace("Processing change {}", tableWithSmallestLsn);
                                    if (!schemaChangeCheckpoints.isEmpty()) {
                                        if (tableWithSmallestLsn
                                                        .getChangePosition()
                                                        .getCommitLsn()
                                                        .compareTo(
                                                                schemaChangeCheckpoints
                                                                        .peek()
                                                                        .getStopLsn())
                                                >= 0) {
                                            migrateTable(
                                                    partition,
                                                    offsetContext,
                                                    schemaChangeCheckpoints);
                                        }
                                    }
                                    final TableId tableId =
                                            tableWithSmallestLsn
                                                    .getChangeTable()
                                                    .getSourceTableId();
                                    final int operation = tableWithSmallestLsn.getOperation();
                                    final Object[] data = tableWithSmallestLsn.getData();

                                    // UPDATE consists of two consecutive events, first event
                                    // contains
                                    // the row before it was updated and the second the row after
                                    // it was updated
                                    int eventCount = 1;
                                    if (operation == Db2ChangeRecordEmitter.OP_UPDATE_BEFORE) {
                                        if (!tableWithSmallestLsn.next()
                                                || tableWithSmallestLsn.getOperation()
                                                        != Db2ChangeRecordEmitter.OP_UPDATE_AFTER) {
                                            throw new IllegalStateException(
                                                    "The update before event at "
                                                            + tableWithSmallestLsn
                                                                    .getChangePosition()
                                                            + " for table "
                                                            + tableId
                                                            + " was not followed by after event.\n Please report this as a bug together with a events around given LSN.");
                                        }
                                        eventCount = 2;
                                    }
                                    final Object[] dataNext =
                                            (operation == Db2ChangeRecordEmitter.OP_UPDATE_BEFORE)
                                                    ? tableWithSmallestLsn.getData()
                                                    : null;

                                    offsetContext.setChangePosition(
                                            tableWithSmallestLsn.getChangePosition(), eventCount);
                                    offsetContext.event(
                                            tableWithSmallestLsn
                                                    .getChangeTable()
                                                    .getSourceTableId(),
                                            metadataConnection.timestampOfLsn(
                                                    tableWithSmallestLsn
                                                            .getChangePosition()
                                                            .getCommitLsn()));

                                    dispatcher.dispatchDataChangeEvent(
                                            partition,
                                            tableId,
                                            new Db2ChangeRecordEmitter(
                                                    partition,
                                                    offsetContext,
                                                    operation,
                                                    data,
                                                    dataNext,
                                                    clock));
                                    tableWithSmallestLsn.next();
                                }
                            });
                    lastProcessedPosition = TxLogPosition.valueOf(currentMaxLsn);
                    // Terminate the transaction otherwise CDC could not be disabled for tables
                    dataConnection.rollback();
                    // Determine whether to continue streaming in db2 cdc snapshot phase
                    afterHandleLsn(partition, currentMaxLsn);
                } catch (SQLException e) {
                    tablesSlot.set(processErrorFromChangeTableQuery(e, tablesSlot.get()));
                }
            }
        } catch (Exception e) {
            errorHandler.setProducerThrowable(e);
        }
    }