public void recycleByTransactionId()

in flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/ProducerPoolImpl.java [119:163]


    public void recycleByTransactionId(String transactionalId, boolean success) {
        ProducerEntry producerEntry = producerByTransactionalId.remove(transactionalId);
        LOG.debug("Transaction {} finished, producer {}", transactionalId, producerEntry);

        if (producerEntry == null) {
            LOG.info(
                    "Received unmatched producer for transaction {}. This is expected during rescale.",
                    transactionalId);
            // recycle of unmatched entries happens on next checkpoint at the second half of this
            // method
            return;
        }

        long finishedChkId = producerEntry.getCheckpointedTransaction().getCheckpointId();
        boolean hasTransactionsFromPreviousCheckpoint =
                transactionalIdsByCheckpoint.firstKey().getCheckpointId() != finishedChkId;
        transactionalIdsByCheckpoint.remove(producerEntry.getCheckpointedTransaction());
        if (success) {
            recycleProducer(producerEntry.getProducer());
        } else {
            closeProducer(producerEntry.getProducer());
        }

        // In rare cases (non-chained committer or recovery), some transactions may not be detected
        // to be finished.
        // For example, a transaction may be committed at the same time the writer state is
        // snapshot. The writer contains the transaction as ongoing but the committer state will
        // later not contain it.
        // In these cases, we make use of the fact that committables are processed in order of the
        // checkpoint id.
        // That means a transaction state with checkpoint id C implies that all C' < C are finished.
        if (hasTransactionsFromPreviousCheckpoint) {
            // We can safely remove all transactions with checkpoint id < finishedChkId.
            // Entries are primarily sorted by checkpoint id
            Iterator<Map.Entry<CheckpointTransaction, String>> iterator =
                    transactionalIdsByCheckpoint.entrySet().iterator();
            while (iterator.hasNext()) {
                Map.Entry<CheckpointTransaction, String> entry = iterator.next();
                if (entry.getKey().getCheckpointId() < finishedChkId) {
                    iterator.remove();
                    closeProducer(producerByTransactionalId.remove(entry.getValue()).getProducer());
                }
            }
        }
    }