in inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java [138:337]
public void execute(ChangeEventSourceContext context, OracleOffsetContext offsetContext) {
try (TransactionalBuffer transactionalBuffer =
new TransactionalBuffer(
connectorConfig, schema, clock, errorHandler, streamingMetrics)) {
try {
startScn = offsetContext.getScn();
if (!isContinuousMining
&& startScn.compareTo(
getFirstOnlineLogScn(
jdbcConnection,
archiveLogRetention,
archiveDestinationName)) < 0) {
throw new DebeziumException(
"Online REDO LOG files or archive log files do not contain the offset scn "
+ startScn
+ ". Please perform a new snapshot.");
}
setNlsSessionParameters(jdbcConnection);
checkSupplementalLogging(jdbcConnection, connectorConfig.getPdbName(), schema);
if (archiveLogOnlyMode && !waitForStartScnInArchiveLogs(context, startScn)) {
return;
}
initializeRedoLogsForMining(jdbcConnection, false, startScn);
HistoryRecorder historyRecorder = connectorConfig.getLogMiningHistoryRecorder();
try {
// todo: why can't OracleConnection be used rather than a
// Factory+JdbcConfiguration?
historyRecorder.prepare(
streamingMetrics,
jdbcConfiguration,
connectorConfig.getLogMinerHistoryRetentionHours());
final LogMinerQueryResultProcessor processor =
new LogMinerQueryResultProcessor(
context,
connectorConfig,
streamingMetrics,
transactionalBuffer,
offsetContext,
schema,
dispatcher,
historyRecorder);
final String query =
LogMinerQueryBuilder.build(connectorConfig, schema);
try (PreparedStatement miningView =
jdbcConnection
.connection()
.prepareStatement(
query,
ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_READ_ONLY,
ResultSet.HOLD_CURSORS_OVER_COMMIT)) {
currentRedoLogSequences = getCurrentRedoLogSequences();
Stopwatch stopwatch = Stopwatch.reusable();
while (context.isRunning()) {
// Calculate time difference before each mining session to detect time
// zone offset changes (e.g. DST) on database server
streamingMetrics.calculateTimeDifference(getSystime(jdbcConnection));
if (archiveLogOnlyMode
&& !waitForStartScnInArchiveLogs(context, startScn)) {
break;
}
Instant start = Instant.now();
endScn =
getEndScn(
jdbcConnection,
startScn,
endScn,
streamingMetrics,
connectorConfig.getLogMiningBatchSizeDefault(),
connectorConfig.isLobEnabled(),
connectorConfig.isArchiveLogOnlyMode(),
connectorConfig.getLogMiningArchiveDestinationName());
// This is a small window where when archive log only mode has
// completely caught up to the last
// record in the archive logs that both the start and end values are
// identical. In this use
// case we want to pause and restart the loop waiting for a new archive
// log before proceeding.
if (archiveLogOnlyMode && startScn.equals(endScn)) {
pauseBetweenMiningSessions();
continue;
}
if (hasLogSwitchOccurred()) {
// This is the way to mitigate PGA leaks.
// With one mining session, it grows and maybe there is another way
// to flush PGA.
// At this point we use a new mining session
LOGGER.trace(
"Ending log mining startScn={}, endScn={}, offsetContext.getScn={}, strategy={}, continuous={}",
startScn,
endScn,
offsetContext.getScn(),
strategy,
isContinuousMining);
endMining(jdbcConnection);
initializeRedoLogsForMining(jdbcConnection, true, startScn);
abandonOldTransactionsIfExist(
jdbcConnection, offsetContext, transactionalBuffer);
// This needs to be re-calculated because building the data
// dictionary will force the
// current redo log sequence to be advanced due to a complete log
// switch of all logs.
currentRedoLogSequences = getCurrentRedoLogSequences();
}
startLogMining(
jdbcConnection,
startScn,
endScn,
strategy,
isContinuousMining,
streamingMetrics);
LOGGER.trace(
"Fetching LogMiner view results SCN {} to {}",
startScn,
endScn);
stopwatch.start();
miningView.setFetchSize(connectorConfig.getMaxQueueSize());
miningView.setFetchDirection(ResultSet.FETCH_FORWARD);
miningView.setString(1, startScn.toString());
miningView.setString(2, endScn.toString());
try (ResultSet rs = miningView.executeQuery()) {
Duration lastDurationOfBatchCapturing =
stopwatch.stop().durations().statistics().getTotal();
streamingMetrics.setLastDurationOfBatchCapturing(
lastDurationOfBatchCapturing);
processor.processResult(rs);
if (connectorConfig.isLobEnabled()) {
startScn =
transactionalBuffer.updateOffsetContext(
offsetContext, dispatcher);
} else {
final Scn lastProcessedScn = processor.getLastProcessedScn();
if (!lastProcessedScn.isNull()
&& lastProcessedScn.compareTo(endScn) < 0) {
// If the last processed SCN is before the endScn we need to
// use the last processed SCN as the
// next starting point as the LGWR buffer didn't flush all
// entries from memory to disk yet.
endScn = lastProcessedScn;
}
if (transactionalBuffer.isEmpty()) {
LOGGER.debug(
"Buffer is empty, updating offset SCN to {}",
endScn);
offsetContext.setScn(endScn);
} else {
final Scn minStartScn = transactionalBuffer.getMinimumScn();
if (!minStartScn.isNull()) {
offsetContext.setScn(
minStartScn.subtract(Scn.valueOf(1)));
dispatcher.dispatchHeartbeatEvent(offsetContext);
}
}
startScn = endScn;
}
}
afterHandleScn(offsetContext);
streamingMetrics.setCurrentBatchProcessingTime(
Duration.between(start, Instant.now()));
pauseBetweenMiningSessions();
}
}
} finally {
historyRecorder.close();
}
} catch (Throwable t) {
logError(streamingMetrics, "Mining session stopped due to the {}", t);
errorHandler.setProducerThrowable(t);
} finally {
LOGGER.info(
"startScn={}, endScn={}, offsetContext.getScn()={}",
startScn,
endScn,
offsetContext.getScn());
LOGGER.info("Transactional buffer dump: {}", transactionalBuffer.toString());
LOGGER.info("Streaming metrics dump: {}", streamingMetrics.toString());
}
}
}