in src/main/java/org/apache/flink/connector/rocketmq/sink/committer/RocketMQCommitter.java [52:83]
public void commit(Collection<CommitRequest<SendCommittable>> requests) {
List<CompletableFuture<Void>> futures = new ArrayList<>();
try {
for (CommitRequest<SendCommittable> request : requests) {
final SendCommittable committable = request.getCommittable();
LOG.info("Commit transaction message, send committable={}", committable);
try {
CompletableFuture<Void> future =
this.getTransactionProducer()
.commit(committable)
.thenAccept(unused -> request.signalAlreadyCommitted())
.exceptionally(
throwable -> {
LOG.error(
"Commit message error, committable={}",
committable);
request.signalFailedWithKnownReason(throwable);
return null;
});
futures.add(future);
} catch (Throwable e) {
LOG.error("Commit message error, committable={}", committable, e);
request.signalFailedWithKnownReason(e);
}
}
CompletableFuture<Void> allFuture =
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
allFuture.get();
} catch (Exception e) {
LOG.error("Commit message error", e);
}
}