private void logFencedRequest()

in flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java [140:172]


    private void logFencedRequest(
            CommitRequest<KafkaCommittable> request, ProducerFencedException e) {
        if (reusesTransactionalIds) {
            // If checkpoint 1 succeeds, checkpoint 2 is aborted, and checkpoint 3 may reuse the id
            // of checkpoint 1. A recovery of checkpoint 1 would show that the transaction has been
            // fenced.
            LOG.warn(
                    "Unable to commit transaction ({}) because its producer is already fenced."
                            + " If this warning appears as part of the recovery of a checkpoint, it is expected in some cases (e.g., aborted checkpoints in previous attempt)."
                            + " If it's outside of recovery, this means that you either have a different sink with the same '{}'"
                            + " or recovery took longer than '{}' ({}ms). In both cases this most likely signals data loss,"
                            + " please consult the Flink documentation for more details.",
                    request,
                    ProducerConfig.TRANSACTIONAL_ID_CONFIG,
                    ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,
                    kafkaProducerConfig.getProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG),
                    e);
        } else {
            // initTransaction has been called on this transaction before
            LOG.error(
                    "Unable to commit transaction ({}) because its producer is already fenced."
                            + " This means that you either have a different producer with the same '{}' (this is"
                            + " unlikely with the '{}' as all generated ids are unique and shouldn't be reused)"
                            + " or recovery took longer than '{}' ({}ms). In both cases this most likely signals data loss,"
                            + " please consult the Flink documentation for more details.",
                    request,
                    ProducerConfig.TRANSACTIONAL_ID_CONFIG,
                    KafkaSink.class.getSimpleName(),
                    ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,
                    kafkaProducerConfig.getProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG),
                    e);
        }
    }