protected void recoverAndCommit()

in flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java [1035:1065]


    protected void recoverAndCommit(FlinkKafkaProducer.KafkaTransactionState transaction) {
        if (transaction.isTransactional()) {
            FlinkKafkaInternalProducer<byte[], byte[]> producer = null;
            try {
                producer = initTransactionalProducer(transaction.transactionalId, false);
                producer.resumeTransaction(transaction.producerId, transaction.epoch);
                producer.commitTransaction();
            } catch (InvalidTxnStateException e) {
                LOG.warn(
                        "Unable to commit recovered transaction ({}) because it's in an invalid state. "
                                + "Most likely the transaction has been aborted for some reason. Please check the Kafka logs for more details.",
                        transaction,
                        e);
            } catch (ProducerFencedException e) {
                LOG.warn(
                        "Unable to commit recovered transaction ({}) because its producer is already fenced."
                                + " This means that you either have a different producer with the same '{}' or"
                                + " recovery took longer than '{}' ({}ms). In both cases this most likely signals data loss,"
                                + " please consult the Flink documentation for more details.",
                        transaction,
                        ProducerConfig.TRANSACTIONAL_ID_CONFIG,
                        ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,
                        getTransactionTimeout(producerConfig),
                        e);
            } finally {
                if (producer != null) {
                    producer.close(Duration.ofSeconds(0));
                }
            }
        }
    }