in modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java [648:755]
private <R> CompletableFuture<R> trackingInvoke(
InternalTransaction tx,
int partId,
Function<Long, ReplicaRequest> mapFunc,
boolean full,
PendingTxPartitionEnlistment enlistment,
@Nullable BiPredicate<R, ReplicaRequest> noWriteChecker,
int retryOnLockConflict
) {
assert !tx.isReadOnly() : format("Tracking invoke is available only for read-write transactions [tx={}].", tx);
enlistment.addTableId(tableId);
ReplicaRequest request = mapFunc.apply(enlistment.consistencyToken());
boolean write = request instanceof SingleRowReplicaRequest && ((SingleRowReplicaRequest) request).requestType() != RW_GET
|| request instanceof MultipleRowReplicaRequest && ((MultipleRowReplicaRequest) request).requestType() != RW_GET_ALL
|| request instanceof SingleRowPkReplicaRequest && ((SingleRowPkReplicaRequest) request).requestType() != RW_GET
|| request instanceof MultipleRowPkReplicaRequest && ((MultipleRowPkReplicaRequest) request).requestType() != RW_GET_ALL
|| request instanceof SwapRowReplicaRequest;
if (full) { // Full transaction retries are handled in postEnlist.
return replicaSvc.invokeRaw(enlistment.primaryNodeConsistentId(), request).handle((r, e) -> {
boolean hasError = e != null;
assert hasError || r instanceof TimestampAware;
// Timestamp is set to commit timestamp for full transactions.
tx.finish(!hasError, hasError ? null : ((TimestampAware) r).timestamp(), true, false);
if (e != null) {
sneakyThrow(e);
}
return (R) r.result();
});
} else {
if (write) { // Track only write requests from explicit transactions.
if (!tx.remote() && !transactionInflights.addInflight(tx.id(), false)) {
int code = TX_ALREADY_FINISHED_ERR;
if (tx.isRolledBackWithTimeoutExceeded()) {
code = TX_ALREADY_FINISHED_WITH_TIMEOUT_ERR;
}
return failedFuture(
new TransactionException(code, format(
"Transaction is already finished [tableName={}, partId={}, txState={}, timeoutExceeded={}].",
tableName,
partId,
tx.state(),
tx.isRolledBackWithTimeoutExceeded()
)));
}
return replicaSvc.<R>invoke(enlistment.primaryNodeConsistentId(), request).thenApply(res -> {
assert noWriteChecker != null;
// Remove inflight if no replication was scheduled, otherwise inflight will be removed by delayed response.
if (!tx.remote() && noWriteChecker.test(res, request)) {
transactionInflights.removeInflight(tx.id());
}
return res;
}).handle((r, e) -> {
if (e != null) {
if (retryOnLockConflict > 0 && matchAny(unwrapCause(e), ACQUIRE_LOCK_ERR)) {
if (!tx.remote()) {
transactionInflights.removeInflight(tx.id()); // Will be retried.
}
return trackingInvoke(
tx,
partId,
ignored -> request,
false,
enlistment,
noWriteChecker,
retryOnLockConflict - 1
);
}
sneakyThrow(e);
}
return completedFuture(r);
}).thenCompose(identity());
} else { // Explicit reads should be retried too.
return replicaSvc.<R>invoke(enlistment.primaryNodeConsistentId(), request).handle((r, e) -> {
if (e != null) {
if (retryOnLockConflict > 0 && matchAny(unwrapCause(e), ACQUIRE_LOCK_ERR)) {
return trackingInvoke(
tx,
partId,
ignored -> request,
false,
enlistment,
noWriteChecker,
retryOnLockConflict - 1
);
}
sneakyThrow(e);
}
return completedFuture(r);
}).thenCompose(identity());
}
}
}