in flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/FlinkKafkaInternalProducer.java [198:242]
public void resumeTransaction(long producerId, short epoch) {
synchronized (producerClosingLock) {
ensureNotClosed();
Preconditions.checkState(
producerId >= 0 && epoch >= 0,
"Incorrect values for producerId %s and epoch %s",
producerId,
epoch);
LOG.info(
"Attempting to resume transaction {} with producerId {} and epoch {}",
transactionalId,
producerId,
epoch);
Object transactionManager = getField(kafkaProducer, "transactionManager");
synchronized (transactionManager) {
Object txnPartitionMap = getField(transactionManager, "txnPartitionMap");
invoke(
transactionManager,
"transitionTo",
getEnum(
"org.apache.kafka.clients.producer.internals.TransactionManager$State.INITIALIZING"));
invoke(txnPartitionMap, "reset");
setField(
transactionManager,
"producerIdAndEpoch",
createProducerIdAndEpoch(producerId, epoch));
invoke(
transactionManager,
"transitionTo",
getEnum(
"org.apache.kafka.clients.producer.internals.TransactionManager$State.READY"));
invoke(
transactionManager,
"transitionTo",
getEnum(
"org.apache.kafka.clients.producer.internals.TransactionManager$State.IN_TRANSACTION"));
setField(transactionManager, "transactionStarted", true);
}
}
}