in flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/ProducerPoolImpl.java [221:249]
public FlinkKafkaInternalProducer<byte[], byte[]> getTransactionalProducer(
String transactionalId, long checkpointId) {
FlinkKafkaInternalProducer<byte[], byte[]> producer = producerPool.poll();
if (producer == null) {
producer = new FlinkKafkaInternalProducer<>(kafkaProducerConfig, transactionalId);
producerInit.accept(producer);
} else if (transactionalId != null) {
producer.setTransactionId(transactionalId);
}
if (transactionalId != null) {
// first keep track of the transaction and producer because initTransaction may be
// interrupted
CheckpointTransaction checkpointedTransaction =
new CheckpointTransaction(transactionalId, checkpointId);
ProducerEntry existing =
producerByTransactionalId.put(
transactionalId, new ProducerEntry(producer, checkpointedTransaction));
transactionalIdsByCheckpoint.put(checkpointedTransaction, transactionalId);
checkState(
existing == null,
"Transaction %s already ongoing existing producer %s; new producer %s",
transactionalId,
existing,
producer);
producer.initTransactions();
}
LOG.debug("getProducer {}, new pool size {}", producer, producerPool.size());
return producer;
}