in flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriter.java [327:359]
private TransactionAbortStrategyContextImpl getTransactionAbortStrategyContext(
long startCheckpointId, List<String> prefixesToAbort) {
TransactionAbortStrategyImpl.TransactionAborter aborter =
transactionalId -> {
// getTransactionalProducer already calls initTransactions, which cancels the
// transaction
FlinkKafkaInternalProducer<byte[], byte[]> producer =
producerPool.getTransactionalProducer(transactionalId, 0);
LOG.debug("Aborting transaction {}", transactionalId);
producer.flush();
short epoch = producer.getEpoch();
producerPool.recycle(producer);
return epoch;
};
Set<String> precommittedTransactionalIds =
recoveredStates.stream()
.flatMap(
s ->
s.getPrecommittedTransactionalIds().stream()
.map(CheckpointTransaction::getTransactionalId))
.collect(Collectors.toSet());
return new TransactionAbortStrategyContextImpl(
this::getTopicNames,
kafkaSinkContext.getParallelInstanceId(),
kafkaSinkContext.getNumberOfParallelInstances(),
ownedSubtaskIds,
totalNumberOfOwnedSubtasks,
prefixesToAbort,
startCheckpointId,
aborter,
this::getAdminClient,
precommittedTransactionalIds);
}