public FlinkKafkaInternalProducer getTransactionalProducer()

in flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/ProducerPoolImpl.java [221:249]


    public FlinkKafkaInternalProducer<byte[], byte[]> getTransactionalProducer(
            String transactionalId, long checkpointId) {
        FlinkKafkaInternalProducer<byte[], byte[]> producer = producerPool.poll();
        if (producer == null) {
            producer = new FlinkKafkaInternalProducer<>(kafkaProducerConfig, transactionalId);
            producerInit.accept(producer);
        } else if (transactionalId != null) {
            producer.setTransactionId(transactionalId);
        }
        if (transactionalId != null) {
            // first keep track of the transaction and producer because initTransaction may be
            // interrupted
            CheckpointTransaction checkpointedTransaction =
                    new CheckpointTransaction(transactionalId, checkpointId);
            ProducerEntry existing =
                    producerByTransactionalId.put(
                            transactionalId, new ProducerEntry(producer, checkpointedTransaction));
            transactionalIdsByCheckpoint.put(checkpointedTransaction, transactionalId);
            checkState(
                    existing == null,
                    "Transaction %s already ongoing existing producer %s; new producer %s",
                    transactionalId,
                    existing,
                    producer);
            producer.initTransactions();
        }
        LOG.debug("getProducer {}, new pool size {}", producer, producerPool.size());
        return producer;
    }