private TransactionAbortStrategyContextImpl getTransactionAbortStrategyContext()

in flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriter.java [327:359]


    private TransactionAbortStrategyContextImpl getTransactionAbortStrategyContext(
            long startCheckpointId, List<String> prefixesToAbort) {
        TransactionAbortStrategyImpl.TransactionAborter aborter =
                transactionalId -> {
                    // getTransactionalProducer already calls initTransactions, which cancels the
                    // transaction
                    FlinkKafkaInternalProducer<byte[], byte[]> producer =
                            producerPool.getTransactionalProducer(transactionalId, 0);
                    LOG.debug("Aborting transaction {}", transactionalId);
                    producer.flush();
                    short epoch = producer.getEpoch();
                    producerPool.recycle(producer);
                    return epoch;
                };
        Set<String> precommittedTransactionalIds =
                recoveredStates.stream()
                        .flatMap(
                                s ->
                                        s.getPrecommittedTransactionalIds().stream()
                                                .map(CheckpointTransaction::getTransactionalId))
                        .collect(Collectors.toSet());
        return new TransactionAbortStrategyContextImpl(
                this::getTopicNames,
                kafkaSinkContext.getParallelInstanceId(),
                kafkaSinkContext.getNumberOfParallelInstances(),
                ownedSubtaskIds,
                totalNumberOfOwnedSubtasks,
                prefixesToAbort,
                startCheckpointId,
                aborter,
                this::getAdminClient,
                precommittedTransactionalIds);
    }