private CompletableFuture processSingleEntryAction()

in modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java [1852:2019]


    private CompletableFuture<Object> processSingleEntryAction(ReadWriteSingleRowReplicaRequest request) {
        UUID txId = request.transactionId();
        BinaryRow searchRow = request.binaryRow();
        TablePartitionId commitPartitionId = request.commitPartitionId();
        boolean full = request.full();

        assert commitPartitionId != null || request.requestType() == RequestType.RW_GET :
                "Commit partition is null [type=" + request.requestType() + ']';

        switch (request.requestType()) {
            case RW_GET: {
                return resolveRowByPk(searchRow, txId, (rowId, row) -> {
                    if (rowId == null) {
                        return completedFuture(null);
                    }

                    return takeLocksForGet(rowId, txId)
                            .thenApply(ignored -> row);
                });
            }
            case RW_DELETE: {
                return resolveRowByPk(searchRow, txId, (rowId, row) -> {
                    if (rowId == null) {
                        return completedFuture(false);
                    }

                    return takeLocksForDelete(row, rowId, txId)
                            .thenCompose(ignored -> applyUpdateCommand(
                                    updateCommand(commitPartitionId, rowId.uuid(), null, txId, full)))
                            .thenApply(ignored -> true);
                });
            }
            case RW_GET_AND_DELETE: {
                return resolveRowByPk(searchRow, txId, (rowId, row) -> {
                    if (rowId == null) {
                        return completedFuture(null);
                    }

                    return takeLocksForDelete(row, rowId, txId)
                            .thenCompose(ignored -> applyUpdateCommand(
                                    updateCommand(commitPartitionId, rowId.uuid(), null, txId, full)))
                            .thenApply(ignored -> row);
                });
            }
            case RW_DELETE_EXACT: {
                return resolveRowByPk(searchRow, txId, (rowId, row) -> {
                    if (rowId == null) {
                        return completedFuture(false);
                    }

                    return takeLocksForDeleteExact(searchRow, rowId, row, txId)
                            .thenCompose(validatedRowId -> {
                                if (validatedRowId == null) {
                                    return completedFuture(false);
                                }

                                return applyUpdateCommand(
                                        updateCommand(commitPartitionId, validatedRowId.uuid(), null, txId, full))
                                        .thenApply(ignored -> true);
                            });
                });
            }
            case RW_INSERT: {
                return resolveRowByPk(searchRow, txId, (rowId, row) -> {
                    if (rowId != null) {
                        return completedFuture(false);
                    }

                    RowId rowId0 = new RowId(partId(), UUID.randomUUID());

                    return takeLocksForInsert(searchRow, rowId0, txId)
                            .thenCompose(rowIdLock -> applyUpdateCommand(
                                    updateCommand(commitPartitionId, rowId0.uuid(), searchRow, txId, full))
                                    .thenApply(ignored -> rowIdLock))
                            .thenApply(rowIdLock -> {
                                // Release short term locks.
                                rowIdLock.get2().forEach(lock -> lockManager.release(lock.txId(), lock.lockKey(), lock.lockMode()));

                                return true;
                            });
                });
            }
            case RW_UPSERT: {
                return resolveRowByPk(searchRow, txId, (rowId, row) -> {
                    boolean insert = rowId == null;

                    RowId rowId0 = insert ? new RowId(partId(), UUID.randomUUID()) : rowId;

                    CompletableFuture<IgniteBiTuple<RowId, Collection<Lock>>> lockFut = insert
                            ? takeLocksForInsert(searchRow, rowId0, txId)
                            : takeLocksForUpdate(searchRow, rowId0, txId);

                    return lockFut
                            .thenCompose(rowIdLock -> applyUpdateCommand(
                                    updateCommand(commitPartitionId, rowId0.uuid(), searchRow, txId, full))
                                    .thenApply(ignored -> rowIdLock))
                            .thenApply(rowIdLock -> {
                                // Release short term locks.
                                rowIdLock.get2().forEach(lock -> lockManager.release(lock.txId(), lock.lockKey(), lock.lockMode()));

                                return null;
                            });
                });
            }
            case RW_GET_AND_UPSERT: {
                return resolveRowByPk(searchRow, txId, (rowId, row) -> {
                    boolean insert = rowId == null;

                    RowId rowId0 = insert ? new RowId(partId(), UUID.randomUUID()) : rowId;

                    CompletableFuture<IgniteBiTuple<RowId, Collection<Lock>>> lockFut = insert
                            ? takeLocksForInsert(searchRow, rowId0, txId)
                            : takeLocksForUpdate(searchRow, rowId0, txId);

                    return lockFut
                            .thenCompose(rowIdLock -> applyUpdateCommand(
                                    updateCommand(commitPartitionId, rowId0.uuid(), searchRow, txId, full))
                                    .thenApply(ignored -> rowIdLock))
                            .thenApply(rowIdLock -> {
                                // Release short term locks.
                                rowIdLock.get2().forEach(lock -> lockManager.release(lock.txId(), lock.lockKey(), lock.lockMode()));

                                return row;
                            });
                });
            }
            case RW_GET_AND_REPLACE: {
                return resolveRowByPk(searchRow, txId, (rowId, row) -> {
                    if (rowId == null) {
                        return completedFuture(null);
                    }

                    return takeLocksForUpdate(searchRow, rowId, txId)
                            .thenCompose(rowIdLock -> applyUpdateCommand(
                                    updateCommand(commitPartitionId, rowId.uuid(), searchRow, txId, full))
                                    .thenApply(ignored -> rowIdLock))
                            .thenApply(rowIdLock -> {
                                // Release short term locks.
                                rowIdLock.get2().forEach(lock -> lockManager.release(lock.txId(), lock.lockKey(), lock.lockMode()));

                                return row;
                            });
                });
            }
            case RW_REPLACE_IF_EXIST: {
                return resolveRowByPk(searchRow, txId, (rowId, row) -> {
                    if (rowId == null) {
                        return completedFuture(false);
                    }

                    return takeLocksForUpdate(searchRow, rowId, txId)
                            .thenCompose(rowLock -> applyUpdateCommand(
                                    updateCommand(commitPartitionId, rowId.uuid(), searchRow, txId, full))
                                    .thenApply(ignored -> rowLock))
                            .thenApply(rowIdLock -> {
                                // Release short term locks.
                                rowIdLock.get2().forEach(lock -> lockManager.release(lock.txId(), lock.lockKey(), lock.lockMode()));

                                return true;
                            });
                });
            }
            default: {
                throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
                        format("Unknown single request [actionType={}]", request.requestType()));
            }
        }
    }