private CompletableFuture processOperationRequest()

in modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java [662:804]


    private CompletableFuture<?> processOperationRequest(
            UUID senderId,
            ReplicaRequest request,
            ReplicaPrimacy replicaPrimacy,
            @Nullable HybridTimestamp opStartTsIfDirectRo
    ) {
        if (request instanceof ReadWriteSingleRowReplicaRequest) {
            var req = (ReadWriteSingleRowReplicaRequest) request;

            var opId = new OperationId(senderId, req.timestamp().longValue());

            return appendTxCommand(
                    req.transactionId(),
                    opId,
                    req.requestType(),
                    req.full(),
                    () -> processSingleEntryAction(req, replicaPrimacy.leaseStartTime())
            );
        } else if (request instanceof ReadWriteSingleRowPkReplicaRequest) {
            var req = (ReadWriteSingleRowPkReplicaRequest) request;

            var opId = new OperationId(senderId, req.timestamp().longValue());

            return appendTxCommand(
                    req.transactionId(),
                    opId,
                    req.requestType(),
                    req.full(),
                    () -> processSingleEntryAction(req, replicaPrimacy.leaseStartTime())
            );
        } else if (request instanceof ReadWriteMultiRowReplicaRequest) {
            var req = (ReadWriteMultiRowReplicaRequest) request;

            var opId = new OperationId(senderId, req.timestamp().longValue());

            return appendTxCommand(
                    req.transactionId(),
                    opId,
                    req.requestType(),
                    req.full(),
                    () -> processMultiEntryAction(req, replicaPrimacy.leaseStartTime())
            );
        } else if (request instanceof ReadWriteMultiRowPkReplicaRequest) {
            var req = (ReadWriteMultiRowPkReplicaRequest) request;

            var opId = new OperationId(senderId, req.timestamp().longValue());

            return appendTxCommand(
                    req.transactionId(),
                    opId,
                    req.requestType(),
                    req.full(),
                    () -> processMultiEntryAction(req, replicaPrimacy.leaseStartTime())
            );
        } else if (request instanceof ReadWriteSwapRowReplicaRequest) {
            var req = (ReadWriteSwapRowReplicaRequest) request;

            var opId = new OperationId(senderId, req.timestamp().longValue());

            return appendTxCommand(
                    req.transactionId(),
                    opId,
                    req.requestType(),
                    req.full(),
                    () -> processTwoEntriesAction(req, replicaPrimacy.leaseStartTime())
            );
        } else if (request instanceof ReadWriteScanRetrieveBatchReplicaRequest) {
            var req = (ReadWriteScanRetrieveBatchReplicaRequest) request;

            // Scan's request.full() has a slightly different semantics than the same field in other requests -
            // it identifies an implicit transaction. Please note that request.full() is always false in the following `appendTxCommand`.
            // We treat SCAN as 2pc and only switch to a 1pc mode if all table rows fit in the bucket and the transaction is implicit.
            // See `req.full() && (err != null || rows.size() < req.batchSize())` condition.
            // If they don't fit the bucket, the transaction is treated as 2pc.
            txManager.updateTxMeta(req.transactionId(), old -> new TxStateMeta(
                    PENDING,
                    req.coordinatorId(),
                    req.commitPartitionId().asReplicationGroupId(),
                    null,
                    old == null ? null : old.tx(),
                    old == null ? null : old.isFinishedDueToTimeout()
            ));

            var opId = new OperationId(senderId, req.timestamp().longValue());

            // Implicit RW scan can be committed locally on a last batch or error.
            return appendTxCommand(req.transactionId(), opId, RW_SCAN, false, () -> processScanRetrieveBatchAction(req))
                    .thenCompose(rows -> {
                        if (allElementsAreNull(rows)) {
                            return completedFuture(rows);
                        } else {
                            return validateRwReadAgainstSchemaAfterTakingLocks(req.transactionId())
                                    .thenApply(ignored -> rows);
                        }
                    })
                    .whenComplete((rows, err) -> {
                        if (req.full() && (err != null || rows.size() < req.batchSize())) {
                            releaseTxLocks(req.transactionId());
                        }
                    });
        } else if (request instanceof ScanCloseReplicaRequest) {
            processScanCloseAction((ScanCloseReplicaRequest) request);

            return nullCompletedFuture();
        } else if (request instanceof TxFinishReplicaRequest) {
            assert !enabledColocation() : request;

            return txFinishReplicaRequestHandler.handle((TxFinishReplicaRequest) request);
        } else if (request instanceof WriteIntentSwitchReplicaRequest) {
            return processWriteIntentSwitchAction((WriteIntentSwitchReplicaRequest) request);
        } else if (request instanceof TableWriteIntentSwitchReplicaRequest) {
            return processTableWriteIntentSwitchAction((TableWriteIntentSwitchReplicaRequest) request);
        } else if (request instanceof ReadOnlySingleRowPkReplicaRequest) {
            return processReadOnlySingleEntryAction((ReadOnlySingleRowPkReplicaRequest) request, replicaPrimacy.isPrimary());
        } else if (request instanceof ReadOnlyMultiRowPkReplicaRequest) {
            return processReadOnlyMultiEntryAction((ReadOnlyMultiRowPkReplicaRequest) request, replicaPrimacy.isPrimary());
        } else if (request instanceof ReadOnlyScanRetrieveBatchReplicaRequest) {
            return processReadOnlyScanRetrieveBatchAction((ReadOnlyScanRetrieveBatchReplicaRequest) request, replicaPrimacy.isPrimary());
        } else if (request instanceof ReplicaSafeTimeSyncRequest) {
            return processReplicaSafeTimeSyncRequest(replicaPrimacy.isPrimary());
        } else if (request instanceof BuildIndexReplicaRequest) {
            return buildIndexReplicaRequestHandler.handle((BuildIndexReplicaRequest) request);
        } else if (request instanceof ReadOnlyDirectSingleRowReplicaRequest) {
            return processReadOnlyDirectSingleEntryAction((ReadOnlyDirectSingleRowReplicaRequest) request, opStartTsIfDirectRo);
        } else if (request instanceof ReadOnlyDirectMultiRowReplicaRequest) {
            return processReadOnlyDirectMultiEntryAction((ReadOnlyDirectMultiRowReplicaRequest) request, opStartTsIfDirectRo);
        } else if (request instanceof TxStateCommitPartitionRequest) {
            assert !enabledColocation() : request;

            return txStateCommitPartitionReplicaRequestHandler.handle((TxStateCommitPartitionRequest) request);
        } else if (request instanceof VacuumTxStateReplicaRequest) {
            assert !enabledColocation() : request;

            return vacuumTxStateReplicaRequestHandler.handle((VacuumTxStateReplicaRequest) request);
        } else if (request instanceof UpdateMinimumActiveTxBeginTimeReplicaRequest) {
            assert !enabledColocation() : request;

            return minimumActiveTxTimeReplicaRequestHandler.handle((UpdateMinimumActiveTxBeginTimeReplicaRequest) request);
        }

        // Unknown request.
        throw new UnsupportedReplicaRequestException(request.getClass());
    }