in flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java [751:833]
private Scn calculateEndScn(OracleConnection connection, Scn startScn, Scn prevEndScn)
throws SQLException {
Scn currentScn =
archiveLogOnlyMode
? getMaxArchiveLogScn(currentLogFiles)
: connection.getCurrentScn();
streamingMetrics.setCurrentScn(currentScn);
// Add the current batch size to the starting system change number
final Scn currentBatchSizeScn = Scn.valueOf(streamingMetrics.getBatchSize());
Scn topScnToMine = startScn.add(currentBatchSizeScn);
// Control adjusting batch size
boolean topMiningScnInFarFuture = false;
if (topScnToMine.subtract(currentScn).compareTo(currentBatchSizeScn) > 0) {
streamingMetrics.changeBatchSize(false, connectorConfig.isLobEnabled());
topMiningScnInFarFuture = true;
}
if (currentScn.subtract(topScnToMine).compareTo(currentBatchSizeScn) > 0) {
streamingMetrics.changeBatchSize(true, connectorConfig.isLobEnabled());
}
// Control sleep time to reduce database impact
if (currentScn.compareTo(topScnToMine) < 0) {
if (!topMiningScnInFarFuture) {
streamingMetrics.changeSleepingTime(true);
}
LOGGER.debug("Using current SCN {} as end SCN.", currentScn);
return currentScn;
} else {
if (prevEndScn != null && topScnToMine.compareTo(prevEndScn) <= 0) {
LOGGER.debug(
"Max batch size too small, using current SCN {} as end SCN.", currentScn);
return currentScn;
}
streamingMetrics.changeSleepingTime(false);
if (topScnToMine.compareTo(startScn) < 0) {
LOGGER.debug(
"Top SCN calculation resulted in end before start SCN, using current SCN {} as end SCN.",
currentScn);
return currentScn;
}
if (prevEndScn != null) {
final Scn deltaScn = currentScn.subtract(prevEndScn);
if (deltaScn.compareTo(
Scn.valueOf(
connectorConfig.getLogMiningScnGapDetectionGapSizeMin()))
> 0) {
Optional<OffsetDateTime> prevEndScnTimestamp =
connection.getScnToTimestamp(prevEndScn);
if (prevEndScnTimestamp.isPresent()) {
Optional<OffsetDateTime> currentScnTimestamp =
connection.getScnToTimestamp(currentScn);
if (currentScnTimestamp.isPresent()) {
long timeDeltaMs =
ChronoUnit.MILLIS.between(
prevEndScnTimestamp.get(), currentScnTimestamp.get());
if (timeDeltaMs
< connectorConfig
.getLogMiningScnGapDetectionTimeIntervalMaxMs()) {
LOGGER.warn(
"Detected possible SCN gap, using current SCN, startSCN {}, prevEndScn {} timestamp {}, current SCN {} timestamp {}.",
startScn,
prevEndScn,
prevEndScnTimestamp.get(),
currentScn,
currentScnTimestamp.get());
return currentScn;
}
}
}
}
}
LOGGER.debug(
"Using Top SCN calculation {} as end SCN. currentScn {}, startScn {}",
topScnToMine,
currentScn,
startScn);
return topScnToMine;
}
}