public List getTransactionsToAbort()

in flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/TransactionsToAbortChecker.java [57:88]


    public List<String> getTransactionsToAbort(Map<Integer, Map<Long, String>> openTransactions) {
        final List<String> transactionalIdsToAbort = new ArrayList<>();
        for (final Map.Entry<Integer, Map<Long, String>> subtaskOffsetMapping :
                openTransactions.entrySet()) {
            final Map<Long, String> checkpointOffsetTransactionalIdMapping =
                    subtaskOffsetMapping.getValue();
            // All transactions from this subtask have been closed
            if (checkpointOffsetTransactionalIdMapping.isEmpty()) {
                continue;
            }
            // Abort all open transactions if checkpointOffset 0 is open implying that no checkpoint
            // finished.
            // Cut the transactions in ranges to speed up abort process
            if (Collections.min(checkpointOffsetTransactionalIdMapping.keySet())
                            == MINIMUM_CHECKPOINT_OFFSET
                    && subtaskOffsetMapping.getKey() % numberOfParallelSubtasks == subtaskId) {
                transactionalIdsToAbort.addAll(checkpointOffsetTransactionalIdMapping.values());
            } else {
                // Check all open transactions against recovered ones and close if the open
                // transaction is equal or higher to the offset
                for (final Map.Entry<Long, String> offsetTransactionId :
                        checkpointOffsetTransactionalIdMapping.entrySet()) {
                    if (!hasSameSubtaskWithHigherCheckpoint(
                            subtaskOffsetMapping.getKey(), offsetTransactionId.getKey())) {
                        continue;
                    }
                    transactionalIdsToAbort.add(offsetTransactionId.getValue());
                }
            }
        }
        return transactionalIdsToAbort;
    }