private CompletableFuture processMultiEntryAction()

in modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java [1508:1741]


    private CompletableFuture<Object> processMultiEntryAction(ReadWriteMultiRowReplicaRequest request) {
        UUID txId = request.transactionId();
        TablePartitionId committedPartitionId = request.commitPartitionId();
        boolean full = request.full();

        assert committedPartitionId != null || request.requestType() == RequestType.RW_GET_ALL
                : "Commit partition is null [type=" + request.requestType() + ']';

        switch (request.requestType()) {
            case RW_GET_ALL: {
                CompletableFuture<BinaryRow>[] rowFuts = new CompletableFuture[request.binaryRows().size()];

                int i = 0;

                for (BinaryRow searchRow : request.binaryRows()) {
                    rowFuts[i++] = resolveRowByPk(searchRow, txId, (rowId, row) -> {
                        if (rowId == null) {
                            return completedFuture(null);
                        }

                        return takeLocksForGet(rowId, txId)
                                .thenApply(ignored -> row);
                    });
                }

                return allOf(rowFuts)
                        .thenCompose(ignored -> {
                            var result = new ArrayList<BinaryRow>(request.binaryRows().size());

                            for (int idx = 0; idx < request.binaryRows().size(); idx++) {
                                result.add(rowFuts[idx].join());
                            }

                            return completedFuture(result);
                        });
            }
            case RW_DELETE_ALL: {
                CompletableFuture<RowId>[] rowIdLockFuts = new CompletableFuture[request.binaryRows().size()];

                int i = 0;

                for (BinaryRow searchRow : request.binaryRows()) {
                    rowIdLockFuts[i++] = resolveRowByPk(searchRow, txId, (rowId, row) -> {
                        if (rowId == null) {
                            return completedFuture(null);
                        }

                        return takeLocksForDelete(row, rowId, txId);
                    });
                }

                return allOf(rowIdLockFuts).thenCompose(ignore -> {
                    Map<UUID, BinaryRowMessage> rowIdsToDelete = new HashMap<>();
                    Collection<BinaryRow> result = new ArrayList<>();

                    int futNum = 0;

                    for (BinaryRow row : request.binaryRows()) {
                        RowId lockedRowId = rowIdLockFuts[futNum++].join();

                        if (lockedRowId != null) {
                            rowIdsToDelete.put(lockedRowId.uuid(), null);
                        } else {
                            result.add(row);
                        }
                    }

                    if (rowIdsToDelete.isEmpty()) {
                        return completedFuture(result);
                    }

                    return applyUpdateAllCommand(updateAllCommand(committedPartitionId, rowIdsToDelete, txId, full))
                            .thenApply(ignored -> result);
                });
            }
            case RW_DELETE_EXACT_ALL: {
                CompletableFuture<RowId>[] deleteExactLockFuts = new CompletableFuture[request.binaryRows().size()];

                int i = 0;

                for (BinaryRow searchRow : request.binaryRows()) {
                    deleteExactLockFuts[i++] = resolveRowByPk(searchRow, txId, (rowId, row) -> {
                        if (rowId == null) {
                            return completedFuture(null);
                        }

                        return takeLocksForDeleteExact(searchRow, rowId, row, txId);
                    });
                }

                return allOf(deleteExactLockFuts).thenCompose(ignore -> {
                    Map<UUID, BinaryRowMessage> rowIdsToDelete = new HashMap<>();
                    Collection<BinaryRow> result = new ArrayList<>();

                    int futNum = 0;

                    for (BinaryRow row : request.binaryRows()) {
                        RowId lockedRowId = deleteExactLockFuts[futNum++].join();

                        if (lockedRowId != null) {
                            rowIdsToDelete.put(lockedRowId.uuid(), null);
                        } else {
                            result.add(row);
                        }
                    }

                    CompletableFuture<Object> raftFut = rowIdsToDelete.isEmpty() ? completedFuture(null)
                            : applyUpdateAllCommand(updateAllCommand(committedPartitionId, rowIdsToDelete, txId, full));

                    return raftFut.thenApply(ignored -> result);
                });
            }
            case RW_INSERT_ALL: {
                CompletableFuture<RowId>[] pkReadLockFuts = new CompletableFuture[request.binaryRows().size()];
                CompletableFuture<BinaryTuple>[] pkTupleFuts = new CompletableFuture[request.binaryRows().size()];

                int i = 0;

                for (BinaryRow searchRow : request.binaryRows()) {
                    pkReadLockFuts[i] = resolveRowByPk(searchRow, txId,
                            (rowId, row) -> completedFuture(rowId));
                    pkTupleFuts[i] = extractKey(searchRow);
                    i++;
                }

                return allOf(ArrayUtils.concat(pkReadLockFuts, pkTupleFuts)).thenCompose(ignore -> {
                    Collection<BinaryRow> result = new ArrayList<>();
                    Map<RowId, BinaryRow> rowsToInsert = new HashMap<>();
                    Set<ByteBuffer> uniqueKeys = new HashSet<>();

                    int futNum = 0;

                    for (BinaryRow row : request.binaryRows()) {
                        RowId lockedRow = pkReadLockFuts[futNum].join();

                        if (lockedRow != null) {
                            result.add(row);
                        } else {
                            BinaryTuple keyTuple = pkTupleFuts[futNum].join();
                            ByteBuffer keyToCheck = keyTuple.byteBuffer();
                            if (uniqueKeys.add(keyToCheck)) {
                                rowsToInsert.put(new RowId(partId(), UUID.randomUUID()), row);
                            } else {
                                result.add(row);
                            }
                        }
                        futNum++;
                    }

                    if (rowsToInsert.isEmpty()) {
                        return completedFuture(result);
                    }

                    CompletableFuture<IgniteBiTuple<RowId, Collection<Lock>>>[] insertLockFuts = new CompletableFuture[rowsToInsert.size()];

                    int idx = 0;

                    for (Map.Entry<RowId, BinaryRow> entry : rowsToInsert.entrySet()) {
                        insertLockFuts[idx++] = takeLocksForInsert(entry.getValue(), entry.getKey(), txId);
                    }

                    Map<UUID, BinaryRowMessage> convertedMap = rowsToInsert.entrySet().stream()
                            .collect(Collectors.toMap(
                                    e -> e.getKey().uuid(),
                                    e -> MSG_FACTORY.binaryRowMessage()
                                            .binaryTuple(e.getValue().tupleSlice())
                                            .schemaVersion(e.getValue().schemaVersion())
                                            .build()
                            ));

                    return allOf(insertLockFuts)
                            .thenCompose(ignored -> applyUpdateAllCommand(
                                    updateAllCommand(committedPartitionId, convertedMap, txId, full)))
                            .thenApply(ignored -> {
                                // Release short term locks.
                                for (CompletableFuture<IgniteBiTuple<RowId, Collection<Lock>>> insertLockFut : insertLockFuts) {
                                    insertLockFut.join().get2()
                                            .forEach(lock -> lockManager.release(lock.txId(), lock.lockKey(), lock.lockMode()));
                                }

                                return result;
                            });
                });
            }
            case RW_UPSERT_ALL: {
                CompletableFuture<IgniteBiTuple<RowId, Collection<Lock>>>[] rowIdFuts = new CompletableFuture[request.binaryRows().size()];

                int i = 0;

                for (BinaryRow searchRow : request.binaryRows()) {
                    rowIdFuts[i++] = resolveRowByPk(searchRow, txId, (rowId, row) -> {
                        boolean insert = rowId == null;

                        RowId rowId0 = insert ? new RowId(partId(), UUID.randomUUID()) : rowId;

                        return insert
                                ? takeLocksForInsert(searchRow, rowId0, txId)
                                : takeLocksForUpdate(searchRow, rowId0, txId);
                    });
                }

                return allOf(rowIdFuts).thenCompose(ignore -> {
                    Map<UUID, BinaryRowMessage> rowsToUpdate = IgniteUtils.newHashMap(request.binaryRowMessages().size());

                    int futNum = 0;

                    for (BinaryRowMessage row : request.binaryRowMessages()) {
                        RowId lockedRow = rowIdFuts[futNum++].join().get1();

                        rowsToUpdate.put(lockedRow.uuid(), row);
                    }

                    if (rowsToUpdate.isEmpty()) {
                        return completedFuture(null);
                    }

                    return applyUpdateAllCommand(updateAllCommand(committedPartitionId, rowsToUpdate, txId, full))
                            .thenApply(ignored -> {
                                // Release short term locks.
                                for (CompletableFuture<IgniteBiTuple<RowId, Collection<Lock>>> rowIdFut : rowIdFuts) {
                                    rowIdFut.join().get2()
                                            .forEach(lock -> lockManager.release(lock.txId(), lock.lockKey(), lock.lockMode()));
                                }

                                return null;
                            });
                });
            }
            default: {
                throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
                        format("Unknown multi request [actionType={}]", request.requestType()));
            }
        }
    }