public void execute()

in inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java [138:337]


    public void execute(ChangeEventSourceContext context, OracleOffsetContext offsetContext) {
        try (TransactionalBuffer transactionalBuffer =
                new TransactionalBuffer(
                        connectorConfig, schema, clock, errorHandler, streamingMetrics)) {
            try {
                startScn = offsetContext.getScn();

                if (!isContinuousMining
                        && startScn.compareTo(
                                getFirstOnlineLogScn(
                                        jdbcConnection,
                                        archiveLogRetention,
                                        archiveDestinationName)) < 0) {
                    throw new DebeziumException(
                            "Online REDO LOG files or archive log files do not contain the offset scn "
                                    + startScn
                                    + ".  Please perform a new snapshot.");
                }

                setNlsSessionParameters(jdbcConnection);
                checkSupplementalLogging(jdbcConnection, connectorConfig.getPdbName(), schema);

                if (archiveLogOnlyMode && !waitForStartScnInArchiveLogs(context, startScn)) {
                    return;
                }

                initializeRedoLogsForMining(jdbcConnection, false, startScn);

                HistoryRecorder historyRecorder = connectorConfig.getLogMiningHistoryRecorder();

                try {
                    // todo: why can't OracleConnection be used rather than a
                    // Factory+JdbcConfiguration?
                    historyRecorder.prepare(
                            streamingMetrics,
                            jdbcConfiguration,
                            connectorConfig.getLogMinerHistoryRetentionHours());

                    final LogMinerQueryResultProcessor processor =
                            new LogMinerQueryResultProcessor(
                                    context,
                                    connectorConfig,
                                    streamingMetrics,
                                    transactionalBuffer,
                                    offsetContext,
                                    schema,
                                    dispatcher,
                                    historyRecorder);

                    final String query =
                            LogMinerQueryBuilder.build(connectorConfig, schema);
                    try (PreparedStatement miningView =
                            jdbcConnection
                                    .connection()
                                    .prepareStatement(
                                            query,
                                            ResultSet.TYPE_FORWARD_ONLY,
                                            ResultSet.CONCUR_READ_ONLY,
                                            ResultSet.HOLD_CURSORS_OVER_COMMIT)) {

                        currentRedoLogSequences = getCurrentRedoLogSequences();
                        Stopwatch stopwatch = Stopwatch.reusable();
                        while (context.isRunning()) {
                            // Calculate time difference before each mining session to detect time
                            // zone offset changes (e.g. DST) on database server
                            streamingMetrics.calculateTimeDifference(getSystime(jdbcConnection));

                            if (archiveLogOnlyMode
                                    && !waitForStartScnInArchiveLogs(context, startScn)) {
                                break;
                            }

                            Instant start = Instant.now();
                            endScn =
                                    getEndScn(
                                            jdbcConnection,
                                            startScn,
                                            endScn,
                                            streamingMetrics,
                                            connectorConfig.getLogMiningBatchSizeDefault(),
                                            connectorConfig.isLobEnabled(),
                                            connectorConfig.isArchiveLogOnlyMode(),
                                            connectorConfig.getLogMiningArchiveDestinationName());

                            // This is a small window where when archive log only mode has
                            // completely caught up to the last
                            // record in the archive logs that both the start and end values are
                            // identical. In this use
                            // case we want to pause and restart the loop waiting for a new archive
                            // log before proceeding.
                            if (archiveLogOnlyMode && startScn.equals(endScn)) {
                                pauseBetweenMiningSessions();
                                continue;
                            }

                            if (hasLogSwitchOccurred()) {
                                // This is the way to mitigate PGA leaks.
                                // With one mining session, it grows and maybe there is another way
                                // to flush PGA.
                                // At this point we use a new mining session
                                LOGGER.trace(
                                        "Ending log mining startScn={}, endScn={}, offsetContext.getScn={}, strategy={}, continuous={}",
                                        startScn,
                                        endScn,
                                        offsetContext.getScn(),
                                        strategy,
                                        isContinuousMining);
                                endMining(jdbcConnection);

                                initializeRedoLogsForMining(jdbcConnection, true, startScn);

                                abandonOldTransactionsIfExist(
                                        jdbcConnection, offsetContext, transactionalBuffer);

                                // This needs to be re-calculated because building the data
                                // dictionary will force the
                                // current redo log sequence to be advanced due to a complete log
                                // switch of all logs.
                                currentRedoLogSequences = getCurrentRedoLogSequences();
                            }

                            startLogMining(
                                    jdbcConnection,
                                    startScn,
                                    endScn,
                                    strategy,
                                    isContinuousMining,
                                    streamingMetrics);

                            LOGGER.trace(
                                    "Fetching LogMiner view results SCN {} to {}",
                                    startScn,
                                    endScn);
                            stopwatch.start();
                            miningView.setFetchSize(connectorConfig.getMaxQueueSize());
                            miningView.setFetchDirection(ResultSet.FETCH_FORWARD);
                            miningView.setString(1, startScn.toString());
                            miningView.setString(2, endScn.toString());
                            try (ResultSet rs = miningView.executeQuery()) {
                                Duration lastDurationOfBatchCapturing =
                                        stopwatch.stop().durations().statistics().getTotal();
                                streamingMetrics.setLastDurationOfBatchCapturing(
                                        lastDurationOfBatchCapturing);
                                processor.processResult(rs);
                                if (connectorConfig.isLobEnabled()) {
                                    startScn =
                                            transactionalBuffer.updateOffsetContext(
                                                    offsetContext, dispatcher);
                                } else {

                                    final Scn lastProcessedScn = processor.getLastProcessedScn();
                                    if (!lastProcessedScn.isNull()
                                            && lastProcessedScn.compareTo(endScn) < 0) {
                                        // If the last processed SCN is before the endScn we need to
                                        // use the last processed SCN as the
                                        // next starting point as the LGWR buffer didn't flush all
                                        // entries from memory to disk yet.
                                        endScn = lastProcessedScn;
                                    }

                                    if (transactionalBuffer.isEmpty()) {
                                        LOGGER.debug(
                                                "Buffer is empty, updating offset SCN to {}",
                                                endScn);
                                        offsetContext.setScn(endScn);
                                    } else {
                                        final Scn minStartScn = transactionalBuffer.getMinimumScn();
                                        if (!minStartScn.isNull()) {
                                            offsetContext.setScn(
                                                    minStartScn.subtract(Scn.valueOf(1)));
                                            dispatcher.dispatchHeartbeatEvent(offsetContext);
                                        }
                                    }
                                    startScn = endScn;
                                }
                            }

                            afterHandleScn(offsetContext);
                            streamingMetrics.setCurrentBatchProcessingTime(
                                    Duration.between(start, Instant.now()));
                            pauseBetweenMiningSessions();
                        }
                    }
                } finally {
                    historyRecorder.close();
                }
            } catch (Throwable t) {
                logError(streamingMetrics, "Mining session stopped due to the {}", t);
                errorHandler.setProducerThrowable(t);
            } finally {
                LOGGER.info(
                        "startScn={}, endScn={}, offsetContext.getScn()={}",
                        startScn,
                        endScn,
                        offsetContext.getScn());
                LOGGER.info("Transactional buffer dump: {}", transactionalBuffer.toString());
                LOGGER.info("Streaming metrics dump: {}", streamingMetrics.toString());
            }
        }
    }