in flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java [140:172]
private void logFencedRequest(
CommitRequest<KafkaCommittable> request, ProducerFencedException e) {
if (reusesTransactionalIds) {
// If checkpoint 1 succeeds, checkpoint 2 is aborted, and checkpoint 3 may reuse the id
// of checkpoint 1. A recovery of checkpoint 1 would show that the transaction has been
// fenced.
LOG.warn(
"Unable to commit transaction ({}) because its producer is already fenced."
+ " If this warning appears as part of the recovery of a checkpoint, it is expected in some cases (e.g., aborted checkpoints in previous attempt)."
+ " If it's outside of recovery, this means that you either have a different sink with the same '{}'"
+ " or recovery took longer than '{}' ({}ms). In both cases this most likely signals data loss,"
+ " please consult the Flink documentation for more details.",
request,
ProducerConfig.TRANSACTIONAL_ID_CONFIG,
ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,
kafkaProducerConfig.getProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG),
e);
} else {
// initTransaction has been called on this transaction before
LOG.error(
"Unable to commit transaction ({}) because its producer is already fenced."
+ " This means that you either have a different producer with the same '{}' (this is"
+ " unlikely with the '{}' as all generated ids are unique and shouldn't be reused)"
+ " or recovery took longer than '{}' ({}ms). In both cases this most likely signals data loss,"
+ " please consult the Flink documentation for more details.",
request,
ProducerConfig.TRANSACTIONAL_ID_CONFIG,
KafkaSink.class.getSimpleName(),
ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,
kafkaProducerConfig.getProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG),
e);
}
}