in flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/ProducerPoolImpl.java [119:163]
public void recycleByTransactionId(String transactionalId, boolean success) {
ProducerEntry producerEntry = producerByTransactionalId.remove(transactionalId);
LOG.debug("Transaction {} finished, producer {}", transactionalId, producerEntry);
if (producerEntry == null) {
LOG.info(
"Received unmatched producer for transaction {}. This is expected during rescale.",
transactionalId);
// recycle of unmatched entries happens on next checkpoint at the second half of this
// method
return;
}
long finishedChkId = producerEntry.getCheckpointedTransaction().getCheckpointId();
boolean hasTransactionsFromPreviousCheckpoint =
transactionalIdsByCheckpoint.firstKey().getCheckpointId() != finishedChkId;
transactionalIdsByCheckpoint.remove(producerEntry.getCheckpointedTransaction());
if (success) {
recycleProducer(producerEntry.getProducer());
} else {
closeProducer(producerEntry.getProducer());
}
// In rare cases (non-chained committer or recovery), some transactions may not be detected
// to be finished.
// For example, a transaction may be committed at the same time the writer state is
// snapshot. The writer contains the transaction as ongoing but the committer state will
// later not contain it.
// In these cases, we make use of the fact that committables are processed in order of the
// checkpoint id.
// That means a transaction state with checkpoint id C implies that all C' < C are finished.
if (hasTransactionsFromPreviousCheckpoint) {
// We can safely remove all transactions with checkpoint id < finishedChkId.
// Entries are primarily sorted by checkpoint id
Iterator<Map.Entry<CheckpointTransaction, String>> iterator =
transactionalIdsByCheckpoint.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<CheckpointTransaction, String> entry = iterator.next();
if (entry.getKey().getCheckpointId() < finishedChkId) {
iterator.remove();
closeProducer(producerByTransactionalId.remove(entry.getValue()).getProducer());
}
}
}
}