public void commit()

in flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java [59:122]


    public void commit(Collection<CommitRequest<KafkaCommittable>> requests)
            throws IOException, InterruptedException {
        for (CommitRequest<KafkaCommittable> request : requests) {
            final KafkaCommittable committable = request.getCommittable();
            final String transactionalId = committable.getTransactionalId();
            LOG.debug("Committing Kafka transaction {}", transactionalId);
            Optional<Recyclable<? extends FlinkKafkaInternalProducer<?, ?>>> recyclable =
                    committable.getProducer();
            FlinkKafkaInternalProducer<?, ?> producer;
            try {
                producer =
                        recyclable
                                .<FlinkKafkaInternalProducer<?, ?>>map(Recyclable::getObject)
                                .orElseGet(() -> getRecoveryProducer(committable));
                producer.commitTransaction();
                producer.flush();
                recyclable.ifPresent(Recyclable::close);
            } catch (RetriableException e) {
                LOG.warn(
                        "Encountered retriable exception while committing {}.", transactionalId, e);
                request.retryLater();
            } catch (ProducerFencedException e) {
                // initTransaction has been called on this transaction before
                LOG.error(
                        "Unable to commit transaction ({}) because its producer is already fenced."
                                + " This means that you either have a different producer with the same '{}' (this is"
                                + " unlikely with the '{}' as all generated ids are unique and shouldn't be reused)"
                                + " or recovery took longer than '{}' ({}ms). In both cases this most likely signals data loss,"
                                + " please consult the Flink documentation for more details.",
                        request,
                        ProducerConfig.TRANSACTIONAL_ID_CONFIG,
                        KafkaSink.class.getSimpleName(),
                        ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,
                        kafkaProducerConfig.getProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG),
                        e);
                recyclable.ifPresent(Recyclable::close);
                request.signalFailedWithKnownReason(e);
            } catch (InvalidTxnStateException e) {
                // This exception only occurs when aborting after a commit or vice versa.
                // It does not appear on double commits or double aborts.
                LOG.error(
                        "Unable to commit transaction ({}) because it's in an invalid state. "
                                + "Most likely the transaction has been aborted for some reason. Please check the Kafka logs for more details.",
                        request,
                        e);
                recyclable.ifPresent(Recyclable::close);
                request.signalFailedWithKnownReason(e);
            } catch (UnknownProducerIdException e) {
                LOG.error(
                        "Unable to commit transaction ({}) " + UNKNOWN_PRODUCER_ID_ERROR_MESSAGE,
                        request,
                        e);
                recyclable.ifPresent(Recyclable::close);
                request.signalFailedWithKnownReason(e);
            } catch (Exception e) {
                LOG.error(
                        "Transaction ({}) encountered error and data has been potentially lost.",
                        request,
                        e);
                recyclable.ifPresent(Recyclable::close);
                request.signalFailedWithUnknownReason(e);
            }
        }
    }