in flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java [1035:1065]
protected void recoverAndCommit(FlinkKafkaProducer.KafkaTransactionState transaction) {
if (transaction.isTransactional()) {
FlinkKafkaInternalProducer<byte[], byte[]> producer = null;
try {
producer = initTransactionalProducer(transaction.transactionalId, false);
producer.resumeTransaction(transaction.producerId, transaction.epoch);
producer.commitTransaction();
} catch (InvalidTxnStateException e) {
LOG.warn(
"Unable to commit recovered transaction ({}) because it's in an invalid state. "
+ "Most likely the transaction has been aborted for some reason. Please check the Kafka logs for more details.",
transaction,
e);
} catch (ProducerFencedException e) {
LOG.warn(
"Unable to commit recovered transaction ({}) because its producer is already fenced."
+ " This means that you either have a different producer with the same '{}' or"
+ " recovery took longer than '{}' ({}ms). In both cases this most likely signals data loss,"
+ " please consult the Flink documentation for more details.",
transaction,
ProducerConfig.TRANSACTIONAL_ID_CONFIG,
ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,
getTransactionTimeout(producerConfig),
e);
} finally {
if (producer != null) {
producer.close(Duration.ofSeconds(0));
}
}
}
}