private CompletableFuture trackingInvoke()

in modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java [648:755]


    private <R> CompletableFuture<R> trackingInvoke(
            InternalTransaction tx,
            int partId,
            Function<Long, ReplicaRequest> mapFunc,
            boolean full,
            PendingTxPartitionEnlistment enlistment,
            @Nullable BiPredicate<R, ReplicaRequest> noWriteChecker,
            int retryOnLockConflict
    ) {
        assert !tx.isReadOnly() : format("Tracking invoke is available only for read-write transactions [tx={}].", tx);

        enlistment.addTableId(tableId);

        ReplicaRequest request = mapFunc.apply(enlistment.consistencyToken());

        boolean write = request instanceof SingleRowReplicaRequest && ((SingleRowReplicaRequest) request).requestType() != RW_GET
                || request instanceof MultipleRowReplicaRequest && ((MultipleRowReplicaRequest) request).requestType() != RW_GET_ALL
                || request instanceof SingleRowPkReplicaRequest && ((SingleRowPkReplicaRequest) request).requestType() != RW_GET
                || request instanceof MultipleRowPkReplicaRequest && ((MultipleRowPkReplicaRequest) request).requestType() != RW_GET_ALL
                || request instanceof SwapRowReplicaRequest;

        if (full) { // Full transaction retries are handled in postEnlist.
            return replicaSvc.invokeRaw(enlistment.primaryNodeConsistentId(), request).handle((r, e) -> {
                boolean hasError = e != null;
                assert hasError || r instanceof TimestampAware;

                // Timestamp is set to commit timestamp for full transactions.
                tx.finish(!hasError, hasError ? null : ((TimestampAware) r).timestamp(), true, false);

                if (e != null) {
                    sneakyThrow(e);
                }

                return (R) r.result();
            });
        } else {
            if (write) { // Track only write requests from explicit transactions.
                if (!tx.remote() && !transactionInflights.addInflight(tx.id(), false)) {
                    int code = TX_ALREADY_FINISHED_ERR;
                    if (tx.isRolledBackWithTimeoutExceeded()) {
                        code = TX_ALREADY_FINISHED_WITH_TIMEOUT_ERR;
                    }

                    return failedFuture(
                            new TransactionException(code, format(
                                    "Transaction is already finished [tableName={}, partId={}, txState={}, timeoutExceeded={}].",
                                    tableName,
                                    partId,
                                    tx.state(),
                                    tx.isRolledBackWithTimeoutExceeded()
                            )));
                }

                return replicaSvc.<R>invoke(enlistment.primaryNodeConsistentId(), request).thenApply(res -> {
                    assert noWriteChecker != null;

                    // Remove inflight if no replication was scheduled, otherwise inflight will be removed by delayed response.
                    if (!tx.remote() && noWriteChecker.test(res, request)) {
                        transactionInflights.removeInflight(tx.id());
                    }

                    return res;
                }).handle((r, e) -> {
                    if (e != null) {
                        if (retryOnLockConflict > 0 && matchAny(unwrapCause(e), ACQUIRE_LOCK_ERR)) {
                            if (!tx.remote()) {
                                transactionInflights.removeInflight(tx.id()); // Will be retried.
                            }

                            return trackingInvoke(
                                    tx,
                                    partId,
                                    ignored -> request,
                                    false,
                                    enlistment,
                                    noWriteChecker,
                                    retryOnLockConflict - 1
                            );
                        }

                        sneakyThrow(e);
                    }

                    return completedFuture(r);
                }).thenCompose(identity());
            } else { // Explicit reads should be retried too.
                return replicaSvc.<R>invoke(enlistment.primaryNodeConsistentId(), request).handle((r, e) -> {
                    if (e != null) {
                        if (retryOnLockConflict > 0 && matchAny(unwrapCause(e), ACQUIRE_LOCK_ERR)) {
                            return trackingInvoke(
                                    tx,
                                    partId,
                                    ignored -> request,
                                    false,
                                    enlistment,
                                    noWriteChecker,
                                    retryOnLockConflict - 1
                            );
                        }

                        sneakyThrow(e);
                    }

                    return completedFuture(r);
                }).thenCompose(identity());
            }
        }
    }