public void resumeTransaction()

in flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/FlinkKafkaInternalProducer.java [300:340]


    public void resumeTransaction(long producerId, short epoch) {
        checkState(!isInTransaction(), "Already in transaction %s", transactionalId);
        checkState(
                producerId >= 0 && epoch >= 0,
                "Incorrect values for producerId %s and epoch %s",
                producerId,
                epoch);
        LOG.info(
                "Attempting to resume transaction {} with producerId {} and epoch {}",
                transactionalId,
                producerId,
                epoch);

        Object transactionManager = getTransactionManager();
        synchronized (transactionManager) {
            Object txnPartitionMap = getField(transactionManager, "txnPartitionMap");

            transitionTransactionManagerStateTo(transactionManager, "INITIALIZING");
            invoke(txnPartitionMap, "reset");

            setField(
                    transactionManager,
                    PRODUCER_ID_AND_EPOCH_FIELD_NAME,
                    createProducerIdAndEpoch(producerId, epoch));

            transitionTransactionManagerStateTo(transactionManager, "READY");

            transitionTransactionManagerStateTo(transactionManager, "IN_TRANSACTION");

            // the transactionStarted flag in the KafkaProducer controls whether
            // an EndTxnRequest will actually be sent to Kafka for a commit
            // or abort API call. This flag is set only after the first send (i.e.
            // only if data is actually written to some partition).
            // In checkpoints, we only ever store metadata of pre-committed
            // transactions that actually have records; therefore, on restore
            // when we create recovery producers to resume transactions and commit
            // them, we should always set this flag.
            setField(transactionManager, "transactionStarted", true);
        }
        this.transactionState = TransactionState.PRECOMMITTED;
    }