in flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriter.java [298:325]
private void abortLingeringTransactions(
Collection<KafkaWriterState> recoveredStates, long startCheckpointId) {
List<String> prefixesToAbort = new ArrayList<>();
prefixesToAbort.add(transactionalIdPrefix);
LOG.info(
"Aborting lingering transactions from previous execution. Recovered states: {}.",
recoveredStates);
final Optional<KafkaWriterState> lastStateOpt = recoveredStates.stream().findFirst();
if (lastStateOpt.isPresent()) {
KafkaWriterState lastState = lastStateOpt.get();
if (!lastState.getTransactionalIdPrefix().equals(transactionalIdPrefix)) {
prefixesToAbort.add(lastState.getTransactionalIdPrefix());
LOG.warn(
"Transactional id prefix from previous execution {} has changed to {}.",
lastState.getTransactionalIdPrefix(),
transactionalIdPrefix);
}
}
LOG.info(
"Aborting lingering transactions with prefixes {} using {}",
prefixesToAbort,
transactionAbortStrategy);
TransactionAbortStrategyContextImpl context =
getTransactionAbortStrategyContext(startCheckpointId, prefixesToAbort);
transactionAbortStrategy.abortTransactions(context);
}