public void commit()

in src/main/java/org/apache/flink/connector/rocketmq/sink/committer/RocketMQCommitter.java [52:83]


    public void commit(Collection<CommitRequest<SendCommittable>> requests) {
        List<CompletableFuture<Void>> futures = new ArrayList<>();
        try {
            for (CommitRequest<SendCommittable> request : requests) {
                final SendCommittable committable = request.getCommittable();
                LOG.info("Commit transaction message, send committable={}", committable);
                try {
                    CompletableFuture<Void> future =
                            this.getTransactionProducer()
                                    .commit(committable)
                                    .thenAccept(unused -> request.signalAlreadyCommitted())
                                    .exceptionally(
                                            throwable -> {
                                                LOG.error(
                                                        "Commit message error, committable={}",
                                                        committable);
                                                request.signalFailedWithKnownReason(throwable);
                                                return null;
                                            });
                    futures.add(future);
                } catch (Throwable e) {
                    LOG.error("Commit message error, committable={}", committable, e);
                    request.signalFailedWithKnownReason(e);
                }
            }
            CompletableFuture<Void> allFuture =
                    CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
            allFuture.get();
        } catch (Exception e) {
            LOG.error("Commit message error", e);
        }
    }