public boolean executeIteration()

in flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java [149:463]


    public boolean executeIteration(
            ChangeEventSourceContext context,
            SqlServerPartition partition,
            SqlServerOffsetContext offsetContext)
            throws InterruptedException {
        if (connectorConfig.getSnapshotMode().equals(SnapshotMode.INITIAL_ONLY)) {
            LOGGER.info("Streaming is not enabled in current configuration");
            return false;
        }

        final String databaseName = partition.getDatabaseName();

        try {
            final SqlServerStreamingExecutionContext streamingExecutionContext =
                    streamingExecutionContexts.getOrDefault(
                            partition,
                            new SqlServerStreamingExecutionContext(
                                    new PriorityQueue<>(
                                            (x, y) -> x.getStopLsn().compareTo(y.getStopLsn())),
                                    new AtomicReference<>(),
                                    offsetContext.getChangePosition(),
                                    new AtomicBoolean(false),
                                    // LSN should be increased for the first run only immediately
                                    // after snapshot completion
                                    // otherwise we might skip an incomplete transaction after
                                    // restart
                                    offsetContext.isSnapshotCompleted()));

            if (!streamingExecutionContexts.containsKey(partition)) {
                streamingExecutionContexts.put(partition, streamingExecutionContext);
                LOGGER.info(
                        "Last position recorded in offsets is {}[{}]",
                        offsetContext.getChangePosition(),
                        offsetContext.getEventSerialNo());
            }

            final Queue<SqlServerChangeTable> schemaChangeCheckpoints =
                    streamingExecutionContext.getSchemaChangeCheckpoints();
            final AtomicReference<SqlServerChangeTable[]> tablesSlot =
                    streamingExecutionContext.getTablesSlot();
            final TxLogPosition lastProcessedPositionOnStart = offsetContext.getChangePosition();
            final long lastProcessedEventSerialNoOnStart = offsetContext.getEventSerialNo();
            final AtomicBoolean changesStoppedBeingMonotonic =
                    streamingExecutionContext.getChangesStoppedBeingMonotonic();
            final int maxTransactionsPerIteration =
                    connectorConfig.getMaxTransactionsPerIteration();

            TxLogPosition lastProcessedPosition =
                    streamingExecutionContext.getLastProcessedPosition();

            if (context.isRunning()) {
                commitTransaction();
                final Lsn toLsn =
                        getToLsn(
                                dataConnection,
                                databaseName,
                                lastProcessedPosition,
                                maxTransactionsPerIteration);

                // Shouldn't happen if the agent is running, but it is better to guard against such
                // situation
                if (!toLsn.isAvailable()) {
                    LOGGER.warn(
                            "No maximum LSN recorded in the database; please ensure that the SQL Server Agent is running");
                    return false;
                }
                // There is no change in the database
                if (toLsn.compareTo(lastProcessedPosition.getCommitLsn()) <= 0
                        && streamingExecutionContext.getShouldIncreaseFromLsn()) {
                    LOGGER.debug("No change in the database");
                    return false;
                }

                // Reading interval is inclusive so we need to move LSN forward but not for first
                // run as TX might not be streamed completely
                final Lsn fromLsn =
                        lastProcessedPosition.getCommitLsn().isAvailable()
                                        && streamingExecutionContext.getShouldIncreaseFromLsn()
                                ? dataConnection.incrementLsn(
                                        databaseName, lastProcessedPosition.getCommitLsn())
                                : lastProcessedPosition.getCommitLsn();
                streamingExecutionContext.setShouldIncreaseFromLsn(true);

                while (!schemaChangeCheckpoints.isEmpty()) {
                    migrateTable(partition, schemaChangeCheckpoints, offsetContext);
                }
                if (!dataConnection.getNewChangeTables(databaseName, fromLsn, toLsn).isEmpty()) {
                    final SqlServerChangeTable[] tables =
                            getChangeTablesToQuery(partition, offsetContext, toLsn);
                    tablesSlot.set(tables);
                    for (SqlServerChangeTable table : tables) {
                        if (table.getStartLsn().isBetween(fromLsn, toLsn)) {
                            LOGGER.info("Schema will be changed for {}", table);
                            schemaChangeCheckpoints.add(table);
                        }
                    }
                }
                if (tablesSlot.get() == null) {
                    tablesSlot.set(getChangeTablesToQuery(partition, offsetContext, toLsn));
                }
                try {
                    dataConnection.getChangesForTables(
                            databaseName,
                            tablesSlot.get(),
                            fromLsn,
                            toLsn,
                            resultSets -> {
                                long eventSerialNoInInitialTx = 1;
                                final int tableCount = resultSets.length;
                                final SqlServerChangeTablePointer[] changeTables =
                                        new SqlServerChangeTablePointer[tableCount];
                                final SqlServerChangeTable[] tables = tablesSlot.get();

                                for (int i = 0; i < tableCount; i++) {
                                    changeTables[i] =
                                            new SqlServerChangeTablePointer(
                                                    tables[i],
                                                    resultSets[i],
                                                    connectorConfig.getSourceTimestampMode());
                                    changeTables[i].next();
                                }

                                for (; ; ) {
                                    SqlServerChangeTablePointer tableWithSmallestLsn = null;
                                    for (SqlServerChangeTablePointer changeTable : changeTables) {
                                        if (changeTable.isCompleted()) {
                                            continue;
                                        }
                                        if (tableWithSmallestLsn == null
                                                || changeTable.compareTo(tableWithSmallestLsn)
                                                        < 0) {
                                            tableWithSmallestLsn = changeTable;
                                        }
                                    }
                                    if (tableWithSmallestLsn == null) {
                                        // No more LSNs available
                                        break;
                                    }

                                    if (!(tableWithSmallestLsn.getChangePosition().isAvailable()
                                            && tableWithSmallestLsn
                                                    .getChangePosition()
                                                    .getInTxLsn()
                                                    .isAvailable())) {
                                        LOGGER.error(
                                                "Skipping change {} as its LSN is NULL which is not expected",
                                                tableWithSmallestLsn);
                                        tableWithSmallestLsn.next();
                                        continue;
                                    }

                                    if (tableWithSmallestLsn.isNewTransaction()
                                            && changesStoppedBeingMonotonic.get()) {
                                        LOGGER.info(
                                                "Resetting changesStoppedBeingMonotonic as transaction changes");
                                        changesStoppedBeingMonotonic.set(false);
                                    }

                                    // After restart for changes that are not monotonic to avoid
                                    // data loss
                                    if (tableWithSmallestLsn
                                            .isCurrentPositionSmallerThanPreviousPosition()) {
                                        LOGGER.info(
                                                "Disabling skipping changes due to not monotonic order of changes");
                                        changesStoppedBeingMonotonic.set(true);
                                    }

                                    // After restart for changes that were executed before the last
                                    // committed offset
                                    if (!changesStoppedBeingMonotonic.get()
                                            && tableWithSmallestLsn
                                                            .getChangePosition()
                                                            .compareTo(lastProcessedPositionOnStart)
                                                    < 0) {
                                        LOGGER.info(
                                                "Skipping change {} as its position is smaller than the last recorded position {}",
                                                tableWithSmallestLsn,
                                                lastProcessedPositionOnStart);
                                        tableWithSmallestLsn.next();
                                        continue;
                                    }
                                    // After restart for change that was the last committed and
                                    // operations in it before the last committed offset
                                    if (!changesStoppedBeingMonotonic.get()
                                            && tableWithSmallestLsn
                                                            .getChangePosition()
                                                            .compareTo(lastProcessedPositionOnStart)
                                                    == 0
                                            && eventSerialNoInInitialTx
                                                    <= lastProcessedEventSerialNoOnStart) {
                                        LOGGER.info(
                                                "Skipping change {} as its order in the transaction {} is smaller than or equal to the last recorded operation {}[{}]",
                                                tableWithSmallestLsn,
                                                eventSerialNoInInitialTx,
                                                lastProcessedPositionOnStart,
                                                lastProcessedEventSerialNoOnStart);
                                        eventSerialNoInInitialTx++;
                                        tableWithSmallestLsn.next();
                                        continue;
                                    }
                                    if (tableWithSmallestLsn
                                                    .getChangeTable()
                                                    .getStopLsn()
                                                    .isAvailable()
                                            && tableWithSmallestLsn
                                                            .getChangeTable()
                                                            .getStopLsn()
                                                            .compareTo(
                                                                    tableWithSmallestLsn
                                                                            .getChangePosition()
                                                                            .getCommitLsn())
                                                    <= 0) {
                                        LOGGER.debug(
                                                "Skipping table change {} as its stop LSN is smaller than the last recorded LSN {}",
                                                tableWithSmallestLsn,
                                                tableWithSmallestLsn.getChangePosition());
                                        tableWithSmallestLsn.next();
                                        continue;
                                    }
                                    LOGGER.trace("Processing change {}", tableWithSmallestLsn);
                                    LOGGER.trace(
                                            "Schema change checkpoints {}",
                                            schemaChangeCheckpoints);
                                    if (!schemaChangeCheckpoints.isEmpty()) {
                                        if (tableWithSmallestLsn
                                                        .getChangePosition()
                                                        .getCommitLsn()
                                                        .compareTo(
                                                                schemaChangeCheckpoints
                                                                        .peek()
                                                                        .getStartLsn())
                                                >= 0) {
                                            migrateTable(
                                                    partition,
                                                    schemaChangeCheckpoints,
                                                    offsetContext);
                                        }
                                    }
                                    final TableId tableId =
                                            tableWithSmallestLsn
                                                    .getChangeTable()
                                                    .getSourceTableId();
                                    final int operation = tableWithSmallestLsn.getOperation();
                                    final Object[] data = tableWithSmallestLsn.getData();

                                    // UPDATE consists of two consecutive events, first event
                                    // contains
                                    // the row before it was updated and the second the row after
                                    // it was updated
                                    int eventCount = 1;
                                    if (operation
                                            == SqlServerChangeRecordEmitter.OP_UPDATE_BEFORE) {
                                        if (!tableWithSmallestLsn.next()
                                                || tableWithSmallestLsn.getOperation()
                                                        != SqlServerChangeRecordEmitter
                                                                .OP_UPDATE_AFTER) {
                                            throw new IllegalStateException(
                                                    "The update before event at "
                                                            + tableWithSmallestLsn
                                                                    .getChangePosition()
                                                            + " for table "
                                                            + tableId
                                                            + " was not followed by after event.\n Please report this as a bug together with a events around given LSN.");
                                        }
                                        eventCount = 2;
                                    }
                                    final Object[] dataNext =
                                            (operation
                                                            == SqlServerChangeRecordEmitter
                                                                    .OP_UPDATE_BEFORE)
                                                    ? tableWithSmallestLsn.getData()
                                                    : null;

                                    offsetContext.setChangePosition(
                                            tableWithSmallestLsn.getChangePosition(), eventCount);
                                    offsetContext.event(
                                            tableWithSmallestLsn
                                                    .getChangeTable()
                                                    .getSourceTableId(),
                                            connectorConfig
                                                    .getSourceTimestampMode()
                                                    .getTimestamp(
                                                            clock,
                                                            tableWithSmallestLsn.getResultSet()));

                                    dispatcher.dispatchDataChangeEvent(
                                            partition,
                                            tableId,
                                            new SqlServerChangeRecordEmitter(
                                                    partition,
                                                    offsetContext,
                                                    operation,
                                                    data,
                                                    dataNext,
                                                    clock));
                                    tableWithSmallestLsn.next();
                                }
                            });
                    streamingExecutionContext.setLastProcessedPosition(
                            TxLogPosition.valueOf(toLsn));
                    // Terminate the transaction otherwise CDC could not be disabled for tables
                    dataConnection.rollback();
                    // Determine whether to continue streaming in sqlserver cdc snapshot phase
                    afterHandleLsn(partition, toLsn);
                } catch (SQLException e) {
                    tablesSlot.set(
                            processErrorFromChangeTableQuery(databaseName, e, tablesSlot.get()));
                }
            }
        } catch (Exception e) {
            errorHandler.setProducerThrowable(e);
        }

        return true;
    }