in flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/datasource/transactions/xa/XaTransaction.java [204:234]
private XaTransactionResult<TransactionId> commit(
List<TransactionId> xids, boolean allowOutOfOrderCommits, int maxCommitAttempts) {
XaTransactionResult<TransactionId> result = new XaTransactionResult<>();
int origSize = xids.size();
LOG.debug("commit {} transactions", origSize);
for (Iterator<TransactionId> i = xids.iterator();
i.hasNext() && (result.hasNoFailures() || allowOutOfOrderCommits); ) {
TransactionId x = i.next();
i.remove();
try {
xaConnectionProvider.commit(x, x.getRestored());
result.succeeded(x);
} catch (TransientXaException e) {
result.failedTransiently(x.withAttemptsIncremented(), e);
} catch (Exception e) {
result.failed(x, e);
}
}
result.getForRetry().addAll(xids);
result.throwIfAnyFailed("commit");
throwIfAnyReachedMaxAttempts(result, maxCommitAttempts);
result.getTransientFailure()
.ifPresent(
f ->
LOG.warn(
"failed to commit {} transactions out of {} (keep them to retry later)",
result.getForRetry().size(),
origSize,
f));
return result;
}