in flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java [138:268]
public void execute(
ChangeEventSourceContext context,
OraclePartition partition,
OracleOffsetContext offsetContext) {
if (!connectorConfig.getSnapshotMode().shouldStream()) {
LOGGER.info("Streaming is not enabled in current configuration");
return;
}
try {
// We explicitly expect auto-commit to be disabled
jdbcConnection.setAutoCommit(false);
startScn = offsetContext.getScn();
snapshotScn = offsetContext.getSnapshotScn();
Scn firstScn = getFirstScnInLogs(jdbcConnection);
if (startScn.compareTo(snapshotScn) == 0) {
// This is the initial run of the streaming change event source.
// We need to compute the correct start offset for mining. That is not the snapshot
// offset,
// but the start offset of the oldest transaction that was still pending when the
// snapshot
// was taken.
computeStartScnForFirstMiningSession(offsetContext, firstScn);
}
try (LogWriterFlushStrategy flushStrategy = resolveFlushStrategy()) {
if (!isContinuousMining && startScn.compareTo(firstScn.subtract(Scn.ONE)) < 0) {
// startScn is the exclusive lower bound, so must be >= (firstScn - 1)
throw new DebeziumException(
"Online REDO LOG files or archive log files do not contain the offset scn "
+ startScn
+ ". Please perform a new snapshot.");
}
setNlsSessionParameters(jdbcConnection);
checkDatabaseAndTableState(jdbcConnection, connectorConfig.getPdbName(), schema);
try (LogMinerEventProcessor processor =
createProcessor(context, partition, offsetContext)) {
if (archiveLogOnlyMode && !waitForStartScnInArchiveLogs(context, startScn)) {
return;
}
initializeRedoLogsForMining(jdbcConnection, false, startScn);
int retryAttempts = 1;
Stopwatch sw = Stopwatch.accumulating().start();
while (context.isRunning()) {
// Calculate time difference before each mining session to detect time zone
// offset changes (e.g. DST) on database server
streamingMetrics.calculateTimeDifference(
getDatabaseSystemTime(jdbcConnection));
if (archiveLogOnlyMode
&& !waitForStartScnInArchiveLogs(context, startScn)) {
break;
}
Instant start = Instant.now();
endScn = calculateEndScn(jdbcConnection, startScn, endScn);
// This is a small window where when archive log only mode has completely
// caught up to the last
// record in the archive logs that both the start and end values are
// identical. In this use
// case we want to pause and restart the loop waiting for a new archive log
// before proceeding.
if (archiveLogOnlyMode && startScn.equals(endScn)) {
pauseBetweenMiningSessions();
continue;
}
flushStrategy.flush(jdbcConnection.getCurrentScn());
boolean restartRequired = false;
if (connectorConfig.getLogMiningMaximumSession().isPresent()) {
final Duration totalDuration =
sw.stop().durations().statistics().getTotal();
if (totalDuration.toMillis()
>= connectorConfig
.getLogMiningMaximumSession()
.get()
.toMillis()) {
LOGGER.info(
"LogMiner session has exceeded maximum session time of '{}', forcing restart.",
connectorConfig.getLogMiningMaximumSession());
restartRequired = true;
} else {
// resume the existing stop watch, we haven't met the criteria yet
sw.start();
}
}
if (restartRequired || hasLogSwitchOccurred()) {
// This is the way to mitigate PGA leaks.
// With one mining session, it grows and maybe there is another way to
// flush PGA.
// At this point we use a new mining session
endMiningSession(jdbcConnection, offsetContext);
initializeRedoLogsForMining(jdbcConnection, true, startScn);
// log switch or restart required, re-create a new stop watch
sw = Stopwatch.accumulating().start();
}
if (context.isRunning()) {
if (!startMiningSession(
jdbcConnection, startScn, endScn, retryAttempts)) {
retryAttempts++;
} else {
retryAttempts = 1;
streamingMetrics.setCurrentBatchProcessingTime(
Duration.between(start, Instant.now()));
captureSessionMemoryStatistics(jdbcConnection);
startScn = processor.process(partition, startScn, endScn);
}
pauseBetweenMiningSessions();
}
}
}
}
} catch (Throwable t) {
logError(streamingMetrics, "Mining session stopped due to the {}", t);
errorHandler.setProducerThrowable(t);
} finally {
LOGGER.info("startScn={}, endScn={}", startScn, endScn);
LOGGER.info("Streaming metrics dump: {}", streamingMetrics.toString());
LOGGER.info("Offsets: {}", offsetContext);
}
}