in flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractLogMinerEventProcessor.java [362:508]
protected void handleCommit(OraclePartition partition, LogMinerEventRow row)
throws InterruptedException {
final String transactionId = row.getTransactionId();
if (isRecentlyProcessed(transactionId)) {
LOGGER.debug("\tTransaction is already committed, skipped.");
return;
}
final T transaction = getAndRemoveTransactionFromCache(transactionId);
if (transaction == null) {
LOGGER.trace("Transaction {} not found, commit skipped.", transactionId);
return;
}
// Calculate the smallest SCN that remains in the transaction cache
final Scn smallestScn = getTransactionCacheMinimumScn();
metrics.setOldestScn(smallestScn.isNull() ? Scn.valueOf(-1) : smallestScn);
final Scn commitScn = row.getScn();
if (offsetContext.getCommitScn().hasCommitAlreadyBeenHandled(row)) {
final Scn lastCommittedScn =
offsetContext.getCommitScn().getCommitScnForRedoThread(row.getThread());
LOGGER.debug(
"Transaction {} has already been processed. "
+ "Offset Commit SCN {}, Transaction Commit SCN {}, Last Seen Commit SCN {}.",
transactionId,
offsetContext.getCommitScn(),
commitScn,
lastCommittedScn);
removeTransactionAndEventsFromCache(transaction);
metrics.setActiveTransactions(getTransactionCache().size());
return;
}
counters.commitCount++;
int numEvents = getTransactionEventCount(transaction);
LOGGER.trace("Commit (smallest SCN {}) {}", smallestScn, row);
LOGGER.trace("Transaction {} has {} events", transactionId, numEvents);
final ZoneOffset databaseOffset = metrics.getDatabaseOffset();
final boolean skipExcludedUserName = isTransactionUserExcluded(transaction);
TransactionCommitConsumer.Handler<LogMinerEvent> delegate =
new TransactionCommitConsumer.Handler<LogMinerEvent>() {
private int numEvents = getTransactionEventCount(transaction);
@Override
public void accept(LogMinerEvent event, long eventsProcessed)
throws InterruptedException {
// Update SCN in offset context only if processed SCN less than SCN of other
// transactions
if (smallestScn.isNull() || commitScn.compareTo(smallestScn) < 0) {
offsetContext.setScn(event.getScn());
metrics.setOldestScn(event.getScn());
}
offsetContext.setEventScn(event.getScn());
offsetContext.setTransactionId(transactionId);
offsetContext.setSourceTime(
event.getChangeTime()
.minusSeconds(databaseOffset.getTotalSeconds()));
offsetContext.setTableId(event.getTableId());
offsetContext.setRedoThread(row.getThread());
if (eventsProcessed == numEvents) {
// reached the last event update the commit scn in the offsets
offsetContext.getCommitScn().recordCommit(row);
}
final DmlEvent dmlEvent = (DmlEvent) event;
if (!skipExcludedUserName) {
LogMinerChangeRecordEmitter logMinerChangeRecordEmitter;
if (dmlEvent instanceof TruncateEvent) {
// a truncate event is seen by logminer as a DDL event type.
// So force this here to be a Truncate Operation.
logMinerChangeRecordEmitter =
new LogMinerChangeRecordEmitter(
connectorConfig,
partition,
offsetContext,
Envelope.Operation.TRUNCATE,
dmlEvent.getDmlEntry().getOldValues(),
dmlEvent.getDmlEntry().getNewValues(),
getSchema().tableFor(event.getTableId()),
getSchema(),
Clock.system(),
dmlEvent.getRowId());
} else {
logMinerChangeRecordEmitter =
new LogMinerChangeRecordEmitter(
connectorConfig,
partition,
offsetContext,
dmlEvent.getEventType(),
dmlEvent.getDmlEntry().getOldValues(),
dmlEvent.getDmlEntry().getNewValues(),
getSchema().tableFor(event.getTableId()),
getSchema(),
Clock.system(),
dmlEvent.getRowId());
}
dispatcher.dispatchDataChangeEvent(
partition, event.getTableId(), logMinerChangeRecordEmitter);
}
}
};
Instant start = Instant.now();
int dispatchedEventCount = 0;
if (numEvents > 0) {
try (TransactionCommitConsumer commitConsumer =
new TransactionCommitConsumer(delegate, connectorConfig, schema)) {
final Iterator<LogMinerEvent> iterator = getTransactionEventIterator(transaction);
while (iterator.hasNext()) {
if (!context.isRunning()) {
return;
}
final LogMinerEvent event = iterator.next();
LOGGER.trace(
"Dispatching event {} {}",
++dispatchedEventCount,
event.getEventType());
commitConsumer.accept(event);
}
}
}
offsetContext.setEventScn(commitScn);
if (getTransactionEventCount(transaction) > 0 && !skipExcludedUserName) {
dispatcher.dispatchTransactionCommittedEvent(partition, offsetContext);
} else {
dispatcher.dispatchHeartbeatEvent(partition, offsetContext);
}
metrics.calculateLagMetrics(row.getChangeTime());
finalizeTransactionCommit(transactionId, commitScn);
removeTransactionAndEventsFromCache(transaction);
metrics.incrementCommittedTransactions();
metrics.setActiveTransactions(getTransactionCache().size());
metrics.incrementCommittedDmlCount(dispatchedEventCount);
metrics.setCommittedScn(commitScn);
metrics.setOffsetScn(offsetContext.getScn());
metrics.setLastCommitDuration(Duration.between(start, Instant.now()));
}