in flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/TransactionsToAbortChecker.java [57:88]
public List<String> getTransactionsToAbort(Map<Integer, Map<Long, String>> openTransactions) {
final List<String> transactionalIdsToAbort = new ArrayList<>();
for (final Map.Entry<Integer, Map<Long, String>> subtaskOffsetMapping :
openTransactions.entrySet()) {
final Map<Long, String> checkpointOffsetTransactionalIdMapping =
subtaskOffsetMapping.getValue();
// All transactions from this subtask have been closed
if (checkpointOffsetTransactionalIdMapping.isEmpty()) {
continue;
}
// Abort all open transactions if checkpointOffset 0 is open implying that no checkpoint
// finished.
// Cut the transactions in ranges to speed up abort process
if (Collections.min(checkpointOffsetTransactionalIdMapping.keySet())
== MINIMUM_CHECKPOINT_OFFSET
&& subtaskOffsetMapping.getKey() % numberOfParallelSubtasks == subtaskId) {
transactionalIdsToAbort.addAll(checkpointOffsetTransactionalIdMapping.values());
} else {
// Check all open transactions against recovered ones and close if the open
// transaction is equal or higher to the offset
for (final Map.Entry<Long, String> offsetTransactionId :
checkpointOffsetTransactionalIdMapping.entrySet()) {
if (!hasSameSubtaskWithHigherCheckpoint(
subtaskOffsetMapping.getKey(), offsetTransactionId.getKey())) {
continue;
}
transactionalIdsToAbort.add(offsetTransactionId.getValue());
}
}
}
return transactionalIdsToAbort;
}