protected void handleCommit()

in flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractLogMinerEventProcessor.java [362:508]


    protected void handleCommit(OraclePartition partition, LogMinerEventRow row)
            throws InterruptedException {
        final String transactionId = row.getTransactionId();
        if (isRecentlyProcessed(transactionId)) {
            LOGGER.debug("\tTransaction is already committed, skipped.");
            return;
        }

        final T transaction = getAndRemoveTransactionFromCache(transactionId);
        if (transaction == null) {
            LOGGER.trace("Transaction {} not found, commit skipped.", transactionId);
            return;
        }

        // Calculate the smallest SCN that remains in the transaction cache
        final Scn smallestScn = getTransactionCacheMinimumScn();
        metrics.setOldestScn(smallestScn.isNull() ? Scn.valueOf(-1) : smallestScn);

        final Scn commitScn = row.getScn();
        if (offsetContext.getCommitScn().hasCommitAlreadyBeenHandled(row)) {
            final Scn lastCommittedScn =
                    offsetContext.getCommitScn().getCommitScnForRedoThread(row.getThread());
            LOGGER.debug(
                    "Transaction {} has already been processed. "
                            + "Offset Commit SCN {}, Transaction Commit SCN {}, Last Seen Commit SCN {}.",
                    transactionId,
                    offsetContext.getCommitScn(),
                    commitScn,
                    lastCommittedScn);
            removeTransactionAndEventsFromCache(transaction);
            metrics.setActiveTransactions(getTransactionCache().size());
            return;
        }

        counters.commitCount++;

        int numEvents = getTransactionEventCount(transaction);
        LOGGER.trace("Commit (smallest SCN {}) {}", smallestScn, row);
        LOGGER.trace("Transaction {} has {} events", transactionId, numEvents);

        final ZoneOffset databaseOffset = metrics.getDatabaseOffset();

        final boolean skipExcludedUserName = isTransactionUserExcluded(transaction);
        TransactionCommitConsumer.Handler<LogMinerEvent> delegate =
                new TransactionCommitConsumer.Handler<LogMinerEvent>() {
                    private int numEvents = getTransactionEventCount(transaction);

                    @Override
                    public void accept(LogMinerEvent event, long eventsProcessed)
                            throws InterruptedException {
                        // Update SCN in offset context only if processed SCN less than SCN of other
                        // transactions
                        if (smallestScn.isNull() || commitScn.compareTo(smallestScn) < 0) {
                            offsetContext.setScn(event.getScn());
                            metrics.setOldestScn(event.getScn());
                        }

                        offsetContext.setEventScn(event.getScn());
                        offsetContext.setTransactionId(transactionId);
                        offsetContext.setSourceTime(
                                event.getChangeTime()
                                        .minusSeconds(databaseOffset.getTotalSeconds()));
                        offsetContext.setTableId(event.getTableId());
                        offsetContext.setRedoThread(row.getThread());
                        if (eventsProcessed == numEvents) {
                            // reached the last event update the commit scn in the offsets
                            offsetContext.getCommitScn().recordCommit(row);
                        }

                        final DmlEvent dmlEvent = (DmlEvent) event;
                        if (!skipExcludedUserName) {
                            LogMinerChangeRecordEmitter logMinerChangeRecordEmitter;
                            if (dmlEvent instanceof TruncateEvent) {
                                // a truncate event is seen by logminer as a DDL event type.
                                // So force this here to be a Truncate Operation.
                                logMinerChangeRecordEmitter =
                                        new LogMinerChangeRecordEmitter(
                                                connectorConfig,
                                                partition,
                                                offsetContext,
                                                Envelope.Operation.TRUNCATE,
                                                dmlEvent.getDmlEntry().getOldValues(),
                                                dmlEvent.getDmlEntry().getNewValues(),
                                                getSchema().tableFor(event.getTableId()),
                                                getSchema(),
                                                Clock.system(),
                                                dmlEvent.getRowId());
                            } else {
                                logMinerChangeRecordEmitter =
                                        new LogMinerChangeRecordEmitter(
                                                connectorConfig,
                                                partition,
                                                offsetContext,
                                                dmlEvent.getEventType(),
                                                dmlEvent.getDmlEntry().getOldValues(),
                                                dmlEvent.getDmlEntry().getNewValues(),
                                                getSchema().tableFor(event.getTableId()),
                                                getSchema(),
                                                Clock.system(),
                                                dmlEvent.getRowId());
                            }
                            dispatcher.dispatchDataChangeEvent(
                                    partition, event.getTableId(), logMinerChangeRecordEmitter);
                        }
                    }
                };

        Instant start = Instant.now();
        int dispatchedEventCount = 0;
        if (numEvents > 0) {
            try (TransactionCommitConsumer commitConsumer =
                    new TransactionCommitConsumer(delegate, connectorConfig, schema)) {
                final Iterator<LogMinerEvent> iterator = getTransactionEventIterator(transaction);
                while (iterator.hasNext()) {
                    if (!context.isRunning()) {
                        return;
                    }

                    final LogMinerEvent event = iterator.next();
                    LOGGER.trace(
                            "Dispatching event {} {}",
                            ++dispatchedEventCount,
                            event.getEventType());
                    commitConsumer.accept(event);
                }
            }
        }

        offsetContext.setEventScn(commitScn);
        if (getTransactionEventCount(transaction) > 0 && !skipExcludedUserName) {
            dispatcher.dispatchTransactionCommittedEvent(partition, offsetContext);
        } else {
            dispatcher.dispatchHeartbeatEvent(partition, offsetContext);
        }

        metrics.calculateLagMetrics(row.getChangeTime());

        finalizeTransactionCommit(transactionId, commitScn);
        removeTransactionAndEventsFromCache(transaction);

        metrics.incrementCommittedTransactions();
        metrics.setActiveTransactions(getTransactionCache().size());
        metrics.incrementCommittedDmlCount(dispatchedEventCount);
        metrics.setCommittedScn(commitScn);
        metrics.setOffsetScn(offsetContext.getScn());
        metrics.setLastCommitDuration(Duration.between(start, Instant.now()));
    }