public void execute()

in flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java [138:268]


    public void execute(
            ChangeEventSourceContext context,
            OraclePartition partition,
            OracleOffsetContext offsetContext) {
        if (!connectorConfig.getSnapshotMode().shouldStream()) {
            LOGGER.info("Streaming is not enabled in current configuration");
            return;
        }
        try {
            // We explicitly expect auto-commit to be disabled
            jdbcConnection.setAutoCommit(false);

            startScn = offsetContext.getScn();
            snapshotScn = offsetContext.getSnapshotScn();
            Scn firstScn = getFirstScnInLogs(jdbcConnection);
            if (startScn.compareTo(snapshotScn) == 0) {
                // This is the initial run of the streaming change event source.
                // We need to compute the correct start offset for mining. That is not the snapshot
                // offset,
                // but the start offset of the oldest transaction that was still pending when the
                // snapshot
                // was taken.
                computeStartScnForFirstMiningSession(offsetContext, firstScn);
            }

            try (LogWriterFlushStrategy flushStrategy = resolveFlushStrategy()) {
                if (!isContinuousMining && startScn.compareTo(firstScn.subtract(Scn.ONE)) < 0) {
                    // startScn is the exclusive lower bound, so must be >= (firstScn - 1)
                    throw new DebeziumException(
                            "Online REDO LOG files or archive log files do not contain the offset scn "
                                    + startScn
                                    + ".  Please perform a new snapshot.");
                }

                setNlsSessionParameters(jdbcConnection);
                checkDatabaseAndTableState(jdbcConnection, connectorConfig.getPdbName(), schema);

                try (LogMinerEventProcessor processor =
                        createProcessor(context, partition, offsetContext)) {

                    if (archiveLogOnlyMode && !waitForStartScnInArchiveLogs(context, startScn)) {
                        return;
                    }

                    initializeRedoLogsForMining(jdbcConnection, false, startScn);

                    int retryAttempts = 1;
                    Stopwatch sw = Stopwatch.accumulating().start();
                    while (context.isRunning()) {
                        // Calculate time difference before each mining session to detect time zone
                        // offset changes (e.g. DST) on database server
                        streamingMetrics.calculateTimeDifference(
                                getDatabaseSystemTime(jdbcConnection));

                        if (archiveLogOnlyMode
                                && !waitForStartScnInArchiveLogs(context, startScn)) {
                            break;
                        }

                        Instant start = Instant.now();
                        endScn = calculateEndScn(jdbcConnection, startScn, endScn);

                        // This is a small window where when archive log only mode has completely
                        // caught up to the last
                        // record in the archive logs that both the start and end values are
                        // identical. In this use
                        // case we want to pause and restart the loop waiting for a new archive log
                        // before proceeding.
                        if (archiveLogOnlyMode && startScn.equals(endScn)) {
                            pauseBetweenMiningSessions();
                            continue;
                        }

                        flushStrategy.flush(jdbcConnection.getCurrentScn());

                        boolean restartRequired = false;
                        if (connectorConfig.getLogMiningMaximumSession().isPresent()) {
                            final Duration totalDuration =
                                    sw.stop().durations().statistics().getTotal();
                            if (totalDuration.toMillis()
                                    >= connectorConfig
                                            .getLogMiningMaximumSession()
                                            .get()
                                            .toMillis()) {
                                LOGGER.info(
                                        "LogMiner session has exceeded maximum session time of '{}', forcing restart.",
                                        connectorConfig.getLogMiningMaximumSession());
                                restartRequired = true;
                            } else {
                                // resume the existing stop watch, we haven't met the criteria yet
                                sw.start();
                            }
                        }

                        if (restartRequired || hasLogSwitchOccurred()) {
                            // This is the way to mitigate PGA leaks.
                            // With one mining session, it grows and maybe there is another way to
                            // flush PGA.
                            // At this point we use a new mining session
                            endMiningSession(jdbcConnection, offsetContext);
                            initializeRedoLogsForMining(jdbcConnection, true, startScn);

                            // log switch or restart required, re-create a new stop watch
                            sw = Stopwatch.accumulating().start();
                        }

                        if (context.isRunning()) {
                            if (!startMiningSession(
                                    jdbcConnection, startScn, endScn, retryAttempts)) {
                                retryAttempts++;
                            } else {
                                retryAttempts = 1;
                                streamingMetrics.setCurrentBatchProcessingTime(
                                        Duration.between(start, Instant.now()));
                                captureSessionMemoryStatistics(jdbcConnection);
                                startScn = processor.process(partition, startScn, endScn);
                            }
                            pauseBetweenMiningSessions();
                        }
                    }
                }
            }
        } catch (Throwable t) {
            logError(streamingMetrics, "Mining session stopped due to the {}", t);
            errorHandler.setProducerThrowable(t);
        } finally {
            LOGGER.info("startScn={}, endScn={}", startScn, endScn);
            LOGGER.info("Streaming metrics dump: {}", streamingMetrics.toString());
            LOGGER.info("Offsets: {}", offsetContext);
        }
    }