in flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/FlinkKafkaInternalProducer.java [300:340]
public void resumeTransaction(long producerId, short epoch) {
checkState(!isInTransaction(), "Already in transaction %s", transactionalId);
checkState(
producerId >= 0 && epoch >= 0,
"Incorrect values for producerId %s and epoch %s",
producerId,
epoch);
LOG.info(
"Attempting to resume transaction {} with producerId {} and epoch {}",
transactionalId,
producerId,
epoch);
Object transactionManager = getTransactionManager();
synchronized (transactionManager) {
Object txnPartitionMap = getField(transactionManager, "txnPartitionMap");
transitionTransactionManagerStateTo(transactionManager, "INITIALIZING");
invoke(txnPartitionMap, "reset");
setField(
transactionManager,
PRODUCER_ID_AND_EPOCH_FIELD_NAME,
createProducerIdAndEpoch(producerId, epoch));
transitionTransactionManagerStateTo(transactionManager, "READY");
transitionTransactionManagerStateTo(transactionManager, "IN_TRANSACTION");
// the transactionStarted flag in the KafkaProducer controls whether
// an EndTxnRequest will actually be sent to Kafka for a commit
// or abort API call. This flag is set only after the first send (i.e.
// only if data is actually written to some partition).
// In checkpoints, we only ever store metadata of pre-committed
// transactions that actually have records; therefore, on restore
// when we create recovery producers to resume transactions and commit
// them, we should always set this flag.
setField(transactionManager, "transactionStarted", true);
}
this.transactionState = TransactionState.PRECOMMITTED;
}