private void abortLingeringTransactions()

in flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriter.java [298:325]


    private void abortLingeringTransactions(
            Collection<KafkaWriterState> recoveredStates, long startCheckpointId) {
        List<String> prefixesToAbort = new ArrayList<>();
        prefixesToAbort.add(transactionalIdPrefix);

        LOG.info(
                "Aborting lingering transactions from previous execution. Recovered states: {}.",
                recoveredStates);
        final Optional<KafkaWriterState> lastStateOpt = recoveredStates.stream().findFirst();
        if (lastStateOpt.isPresent()) {
            KafkaWriterState lastState = lastStateOpt.get();
            if (!lastState.getTransactionalIdPrefix().equals(transactionalIdPrefix)) {
                prefixesToAbort.add(lastState.getTransactionalIdPrefix());
                LOG.warn(
                        "Transactional id prefix from previous execution {} has changed to {}.",
                        lastState.getTransactionalIdPrefix(),
                        transactionalIdPrefix);
            }
        }

        LOG.info(
                "Aborting lingering transactions with prefixes {} using {}",
                prefixesToAbort,
                transactionAbortStrategy);
        TransactionAbortStrategyContextImpl context =
                getTransactionAbortStrategyContext(startCheckpointId, prefixesToAbort);
        transactionAbortStrategy.abortTransactions(context);
    }