public void commit()

in flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java [91:138]


    public void commit(Collection<CommitRequest<KafkaCommittable>> requests)
            throws IOException, InterruptedException {
        for (CommitRequest<KafkaCommittable> request : requests) {
            final KafkaCommittable committable = request.getCommittable();
            final String transactionalId = committable.getTransactionalId();
            LOG.debug("Committing Kafka transaction {}", transactionalId);
            Optional<FlinkKafkaInternalProducer<?, ?>> writerProducer = committable.getProducer();
            FlinkKafkaInternalProducer<?, ?> producer = null;
            try {
                producer = writerProducer.orElseGet(() -> getProducer(committable));
                producer.commitTransaction();
                backchannel.send(TransactionFinished.successful(committable.getTransactionalId()));
            } catch (RetriableException e) {
                LOG.warn(
                        "Encountered retriable exception while committing {}.", transactionalId, e);
                request.retryLater();
            } catch (ProducerFencedException e) {
                logFencedRequest(request, e);
                handleFailedTransaction(producer);
                request.signalFailedWithKnownReason(e);
            } catch (InvalidTxnStateException e) {
                // This exception only occurs when aborting after a commit or vice versa.
                // It does not appear on double commits or double aborts.
                LOG.error(
                        "Unable to commit transaction ({}) because it's in an invalid state. "
                                + "Most likely the transaction has been aborted for some reason. Please check the Kafka logs for more details.",
                        request,
                        e);
                handleFailedTransaction(producer);
                request.signalFailedWithKnownReason(e);
            } catch (UnknownProducerIdException e) {
                LOG.error(
                        "Unable to commit transaction ({}) " + UNKNOWN_PRODUCER_ID_ERROR_MESSAGE,
                        request,
                        e);
                handleFailedTransaction(producer);
                request.signalFailedWithKnownReason(e);
            } catch (Exception e) {
                LOG.error(
                        "Transaction ({}) encountered error and data has been potentially lost.",
                        request,
                        e);
                closeCommitterProducer(producer);
                // cause failover
                request.signalFailedWithUnknownReason(e);
            }
        }
    }