private Scn calculateEndScn()

in flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java [751:833]


    private Scn calculateEndScn(OracleConnection connection, Scn startScn, Scn prevEndScn)
            throws SQLException {
        Scn currentScn =
                archiveLogOnlyMode
                        ? getMaxArchiveLogScn(currentLogFiles)
                        : connection.getCurrentScn();
        streamingMetrics.setCurrentScn(currentScn);

        // Add the current batch size to the starting system change number
        final Scn currentBatchSizeScn = Scn.valueOf(streamingMetrics.getBatchSize());
        Scn topScnToMine = startScn.add(currentBatchSizeScn);

        // Control adjusting batch size
        boolean topMiningScnInFarFuture = false;
        if (topScnToMine.subtract(currentScn).compareTo(currentBatchSizeScn) > 0) {
            streamingMetrics.changeBatchSize(false, connectorConfig.isLobEnabled());
            topMiningScnInFarFuture = true;
        }
        if (currentScn.subtract(topScnToMine).compareTo(currentBatchSizeScn) > 0) {
            streamingMetrics.changeBatchSize(true, connectorConfig.isLobEnabled());
        }

        // Control sleep time to reduce database impact
        if (currentScn.compareTo(topScnToMine) < 0) {
            if (!topMiningScnInFarFuture) {
                streamingMetrics.changeSleepingTime(true);
            }
            LOGGER.debug("Using current SCN {} as end SCN.", currentScn);
            return currentScn;
        } else {
            if (prevEndScn != null && topScnToMine.compareTo(prevEndScn) <= 0) {
                LOGGER.debug(
                        "Max batch size too small, using current SCN {} as end SCN.", currentScn);
                return currentScn;
            }
            streamingMetrics.changeSleepingTime(false);
            if (topScnToMine.compareTo(startScn) < 0) {
                LOGGER.debug(
                        "Top SCN calculation resulted in end before start SCN, using current SCN {} as end SCN.",
                        currentScn);
                return currentScn;
            }

            if (prevEndScn != null) {
                final Scn deltaScn = currentScn.subtract(prevEndScn);
                if (deltaScn.compareTo(
                                Scn.valueOf(
                                        connectorConfig.getLogMiningScnGapDetectionGapSizeMin()))
                        > 0) {
                    Optional<OffsetDateTime> prevEndScnTimestamp =
                            connection.getScnToTimestamp(prevEndScn);
                    if (prevEndScnTimestamp.isPresent()) {
                        Optional<OffsetDateTime> currentScnTimestamp =
                                connection.getScnToTimestamp(currentScn);
                        if (currentScnTimestamp.isPresent()) {
                            long timeDeltaMs =
                                    ChronoUnit.MILLIS.between(
                                            prevEndScnTimestamp.get(), currentScnTimestamp.get());
                            if (timeDeltaMs
                                    < connectorConfig
                                            .getLogMiningScnGapDetectionTimeIntervalMaxMs()) {
                                LOGGER.warn(
                                        "Detected possible SCN gap, using current SCN, startSCN {}, prevEndScn {} timestamp {}, current SCN {} timestamp {}.",
                                        startScn,
                                        prevEndScn,
                                        prevEndScnTimestamp.get(),
                                        currentScn,
                                        currentScnTimestamp.get());
                                return currentScn;
                            }
                        }
                    }
                }
            }

            LOGGER.debug(
                    "Using Top SCN calculation {} as end SCN. currentScn {}, startScn {}",
                    topScnToMine,
                    currentScn,
                    startScn);
            return topScnToMine;
        }
    }