in flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XaGroupOpsImpl.java [48:78]
public GroupXaOperationResult<CheckpointAndXid> commit(
List<CheckpointAndXid> xids, boolean allowOutOfOrderCommits, int maxCommitAttempts) {
GroupXaOperationResult<CheckpointAndXid> result = new GroupXaOperationResult<>();
int origSize = xids.size();
LOG.debug("commit {} transactions", origSize);
for (Iterator<CheckpointAndXid> i = xids.iterator();
i.hasNext() && (result.hasNoFailures() || allowOutOfOrderCommits); ) {
CheckpointAndXid x = i.next();
i.remove();
try {
xaFacade.commit(x.xid, x.restored);
result.succeeded(x);
} catch (TransientXaException e) {
result.failedTransiently(x.withAttemptsIncremented(), e);
} catch (Exception e) {
result.failed(x, e);
}
}
result.getForRetry().addAll(xids);
result.throwIfAnyFailed("commit");
throwIfAnyReachedMaxAttempts(result, maxCommitAttempts);
result.getTransientFailure()
.ifPresent(
f ->
LOG.warn(
"failed to commit {} transactions out of {} (keep them to retry later)",
result.getForRetry().size(),
origSize,
f));
return result;
}