public void commit()

in flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/committer/PulsarCommitter.java [69:145]


    public void commit(Collection<CommitRequest<PulsarCommittable>> requests)
            throws PulsarClientException {
        TransactionCoordinatorClient client = transactionCoordinatorClient();

        for (CommitRequest<PulsarCommittable> request : requests) {
            PulsarCommittable committable = request.getCommittable();
            TxnID txnID = committable.getTxnID();
            String topic = committable.getTopic();

            LOG.debug("Start committing the Pulsar transaction {} for topic {}", txnID, topic);
            try {
                client.commit(txnID);
            } catch (CoordinatorNotFoundException e) {
                LOG.error(
                        "We couldn't find the Transaction Coordinator from Pulsar broker {}. "
                                + "Check your broker configuration.",
                        committable,
                        e);
                request.signalFailedWithKnownReason(e);
            } catch (InvalidTxnStatusException e) {
                LOG.error(
                        "Unable to commit transaction ({}) because it's in an invalid state. "
                                + "Most likely the transaction has been aborted for some reason. "
                                + "Please check the Pulsar broker logs for more details.",
                        committable,
                        e);
                request.signalAlreadyCommitted();
            } catch (TransactionNotFoundException e) {
                if (request.getNumberOfRetries() == 0) {
                    LOG.error(
                            "Unable to commit transaction ({}) because it's not found on Pulsar broker. "
                                    + "Most likely the checkpoint interval exceed the transaction timeout.",
                            committable,
                            e);
                    request.signalFailedWithKnownReason(e);
                } else {
                    LOG.warn(
                            "We can't find the transaction {} after {} retry committing. "
                                    + "This may mean that the transaction have been committed in previous but failed with timeout. "
                                    + "So we just mark it as committed.",
                            txnID,
                            request.getNumberOfRetries());
                    request.signalAlreadyCommitted();
                }
            } catch (MetaStoreHandlerNotExistsException e) {
                LOG.error(
                        "We can't find the meta store handler by the mostSigBits from TxnID {}. "
                                + "Did you change the metadata for topic {}?",
                        committable,
                        TRANSACTION_COORDINATOR_ASSIGN,
                        e);
                request.signalFailedWithKnownReason(e);
            } catch (TransactionCoordinatorClientException e) {
                LOG.error(
                        "Encountered retriable exception while committing transaction {} for topic {}.",
                        committable,
                        topic,
                        e);
                int maxRecommitTimes = sinkConfiguration.getMaxRecommitTimes();
                if (request.getNumberOfRetries() < maxRecommitTimes) {
                    request.retryLater();
                } else {
                    String message =
                            String.format(
                                    "Failed to commit transaction %s after retrying %d times",
                                    txnID, maxRecommitTimes);
                    request.signalFailedWithKnownReason(new FlinkRuntimeException(message, e));
                }
            } catch (Exception e) {
                LOG.error(
                        "Transaction ({}) encountered unknown error and data could be potentially lost.",
                        committable,
                        e);
                request.signalFailedWithUnknownReason(e);
            }
        }
    }