public void resumeTransaction()

in flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/FlinkKafkaInternalProducer.java [198:242]


    public void resumeTransaction(long producerId, short epoch) {
        synchronized (producerClosingLock) {
            ensureNotClosed();
            Preconditions.checkState(
                    producerId >= 0 && epoch >= 0,
                    "Incorrect values for producerId %s and epoch %s",
                    producerId,
                    epoch);
            LOG.info(
                    "Attempting to resume transaction {} with producerId {} and epoch {}",
                    transactionalId,
                    producerId,
                    epoch);

            Object transactionManager = getField(kafkaProducer, "transactionManager");
            synchronized (transactionManager) {
                Object txnPartitionMap = getField(transactionManager, "txnPartitionMap");

                invoke(
                        transactionManager,
                        "transitionTo",
                        getEnum(
                                "org.apache.kafka.clients.producer.internals.TransactionManager$State.INITIALIZING"));
                invoke(txnPartitionMap, "reset");

                setField(
                        transactionManager,
                        "producerIdAndEpoch",
                        createProducerIdAndEpoch(producerId, epoch));

                invoke(
                        transactionManager,
                        "transitionTo",
                        getEnum(
                                "org.apache.kafka.clients.producer.internals.TransactionManager$State.READY"));

                invoke(
                        transactionManager,
                        "transitionTo",
                        getEnum(
                                "org.apache.kafka.clients.producer.internals.TransactionManager$State.IN_TRANSACTION"));
                setField(transactionManager, "transactionStarted", true);
            }
        }
    }