GroupXaOperationResult commit()

in flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XaGroupOps.java [35:98]


    GroupXaOperationResult<CheckpointAndXid> commit(
            List<CheckpointAndXid> xids, boolean allowOutOfOrderCommits, int maxCommitAttempts);

    GroupXaOperationResult<Xid> failOrRollback(Collection<Xid> xids);

    void recoverAndRollback(RuntimeContext runtimeContext, XidGenerator xidGenerator);

    class GroupXaOperationResult<T> {
        private final List<T> succeeded = new ArrayList<>();
        private final List<T> failed = new ArrayList<>();
        private final List<T> toRetry = new ArrayList<>();
        private Optional<Exception> failure = Optional.empty();
        private Optional<Exception> transientFailure = Optional.empty();

        void failedTransiently(T x, XaFacade.TransientXaException e) {
            toRetry.add(x);
            transientFailure =
                    getTransientFailure().isPresent() ? getTransientFailure() : Optional.of(e);
        }

        void failed(T x, Exception e) {
            failed.add(x);
            failure = failure.isPresent() ? failure : Optional.of(e);
        }

        void succeeded(T x) {
            succeeded.add(x);
        }

        private FlinkRuntimeException wrapFailure(
                Exception error, String formatWithCounts, int errCount) {
            return new FlinkRuntimeException(
                    String.format(formatWithCounts, errCount, total()), error);
        }

        private int total() {
            return succeeded.size() + failed.size() + toRetry.size();
        }

        List<T> getForRetry() {
            return toRetry;
        }

        Optional<Exception> getTransientFailure() {
            return transientFailure;
        }

        boolean hasNoFailures() {
            return !failure.isPresent() && !transientFailure.isPresent();
        }

        void throwIfAnyFailed(String action) {
            failure.map(
                            f ->
                                    wrapFailure(
                                            f,
                                            "failed to " + action + " %d transactions out of %d",
                                            toRetry.size() + failed.size()))
                    .ifPresent(
                            f -> {
                                throw f;
                            });
        }
    }