in modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java [1508:1741]
private CompletableFuture<Object> processMultiEntryAction(ReadWriteMultiRowReplicaRequest request) {
UUID txId = request.transactionId();
TablePartitionId committedPartitionId = request.commitPartitionId();
boolean full = request.full();
assert committedPartitionId != null || request.requestType() == RequestType.RW_GET_ALL
: "Commit partition is null [type=" + request.requestType() + ']';
switch (request.requestType()) {
case RW_GET_ALL: {
CompletableFuture<BinaryRow>[] rowFuts = new CompletableFuture[request.binaryRows().size()];
int i = 0;
for (BinaryRow searchRow : request.binaryRows()) {
rowFuts[i++] = resolveRowByPk(searchRow, txId, (rowId, row) -> {
if (rowId == null) {
return completedFuture(null);
}
return takeLocksForGet(rowId, txId)
.thenApply(ignored -> row);
});
}
return allOf(rowFuts)
.thenCompose(ignored -> {
var result = new ArrayList<BinaryRow>(request.binaryRows().size());
for (int idx = 0; idx < request.binaryRows().size(); idx++) {
result.add(rowFuts[idx].join());
}
return completedFuture(result);
});
}
case RW_DELETE_ALL: {
CompletableFuture<RowId>[] rowIdLockFuts = new CompletableFuture[request.binaryRows().size()];
int i = 0;
for (BinaryRow searchRow : request.binaryRows()) {
rowIdLockFuts[i++] = resolveRowByPk(searchRow, txId, (rowId, row) -> {
if (rowId == null) {
return completedFuture(null);
}
return takeLocksForDelete(row, rowId, txId);
});
}
return allOf(rowIdLockFuts).thenCompose(ignore -> {
Map<UUID, BinaryRowMessage> rowIdsToDelete = new HashMap<>();
Collection<BinaryRow> result = new ArrayList<>();
int futNum = 0;
for (BinaryRow row : request.binaryRows()) {
RowId lockedRowId = rowIdLockFuts[futNum++].join();
if (lockedRowId != null) {
rowIdsToDelete.put(lockedRowId.uuid(), null);
} else {
result.add(row);
}
}
if (rowIdsToDelete.isEmpty()) {
return completedFuture(result);
}
return applyUpdateAllCommand(updateAllCommand(committedPartitionId, rowIdsToDelete, txId, full))
.thenApply(ignored -> result);
});
}
case RW_DELETE_EXACT_ALL: {
CompletableFuture<RowId>[] deleteExactLockFuts = new CompletableFuture[request.binaryRows().size()];
int i = 0;
for (BinaryRow searchRow : request.binaryRows()) {
deleteExactLockFuts[i++] = resolveRowByPk(searchRow, txId, (rowId, row) -> {
if (rowId == null) {
return completedFuture(null);
}
return takeLocksForDeleteExact(searchRow, rowId, row, txId);
});
}
return allOf(deleteExactLockFuts).thenCompose(ignore -> {
Map<UUID, BinaryRowMessage> rowIdsToDelete = new HashMap<>();
Collection<BinaryRow> result = new ArrayList<>();
int futNum = 0;
for (BinaryRow row : request.binaryRows()) {
RowId lockedRowId = deleteExactLockFuts[futNum++].join();
if (lockedRowId != null) {
rowIdsToDelete.put(lockedRowId.uuid(), null);
} else {
result.add(row);
}
}
CompletableFuture<Object> raftFut = rowIdsToDelete.isEmpty() ? completedFuture(null)
: applyUpdateAllCommand(updateAllCommand(committedPartitionId, rowIdsToDelete, txId, full));
return raftFut.thenApply(ignored -> result);
});
}
case RW_INSERT_ALL: {
CompletableFuture<RowId>[] pkReadLockFuts = new CompletableFuture[request.binaryRows().size()];
CompletableFuture<BinaryTuple>[] pkTupleFuts = new CompletableFuture[request.binaryRows().size()];
int i = 0;
for (BinaryRow searchRow : request.binaryRows()) {
pkReadLockFuts[i] = resolveRowByPk(searchRow, txId,
(rowId, row) -> completedFuture(rowId));
pkTupleFuts[i] = extractKey(searchRow);
i++;
}
return allOf(ArrayUtils.concat(pkReadLockFuts, pkTupleFuts)).thenCompose(ignore -> {
Collection<BinaryRow> result = new ArrayList<>();
Map<RowId, BinaryRow> rowsToInsert = new HashMap<>();
Set<ByteBuffer> uniqueKeys = new HashSet<>();
int futNum = 0;
for (BinaryRow row : request.binaryRows()) {
RowId lockedRow = pkReadLockFuts[futNum].join();
if (lockedRow != null) {
result.add(row);
} else {
BinaryTuple keyTuple = pkTupleFuts[futNum].join();
ByteBuffer keyToCheck = keyTuple.byteBuffer();
if (uniqueKeys.add(keyToCheck)) {
rowsToInsert.put(new RowId(partId(), UUID.randomUUID()), row);
} else {
result.add(row);
}
}
futNum++;
}
if (rowsToInsert.isEmpty()) {
return completedFuture(result);
}
CompletableFuture<IgniteBiTuple<RowId, Collection<Lock>>>[] insertLockFuts = new CompletableFuture[rowsToInsert.size()];
int idx = 0;
for (Map.Entry<RowId, BinaryRow> entry : rowsToInsert.entrySet()) {
insertLockFuts[idx++] = takeLocksForInsert(entry.getValue(), entry.getKey(), txId);
}
Map<UUID, BinaryRowMessage> convertedMap = rowsToInsert.entrySet().stream()
.collect(Collectors.toMap(
e -> e.getKey().uuid(),
e -> MSG_FACTORY.binaryRowMessage()
.binaryTuple(e.getValue().tupleSlice())
.schemaVersion(e.getValue().schemaVersion())
.build()
));
return allOf(insertLockFuts)
.thenCompose(ignored -> applyUpdateAllCommand(
updateAllCommand(committedPartitionId, convertedMap, txId, full)))
.thenApply(ignored -> {
// Release short term locks.
for (CompletableFuture<IgniteBiTuple<RowId, Collection<Lock>>> insertLockFut : insertLockFuts) {
insertLockFut.join().get2()
.forEach(lock -> lockManager.release(lock.txId(), lock.lockKey(), lock.lockMode()));
}
return result;
});
});
}
case RW_UPSERT_ALL: {
CompletableFuture<IgniteBiTuple<RowId, Collection<Lock>>>[] rowIdFuts = new CompletableFuture[request.binaryRows().size()];
int i = 0;
for (BinaryRow searchRow : request.binaryRows()) {
rowIdFuts[i++] = resolveRowByPk(searchRow, txId, (rowId, row) -> {
boolean insert = rowId == null;
RowId rowId0 = insert ? new RowId(partId(), UUID.randomUUID()) : rowId;
return insert
? takeLocksForInsert(searchRow, rowId0, txId)
: takeLocksForUpdate(searchRow, rowId0, txId);
});
}
return allOf(rowIdFuts).thenCompose(ignore -> {
Map<UUID, BinaryRowMessage> rowsToUpdate = IgniteUtils.newHashMap(request.binaryRowMessages().size());
int futNum = 0;
for (BinaryRowMessage row : request.binaryRowMessages()) {
RowId lockedRow = rowIdFuts[futNum++].join().get1();
rowsToUpdate.put(lockedRow.uuid(), row);
}
if (rowsToUpdate.isEmpty()) {
return completedFuture(null);
}
return applyUpdateAllCommand(updateAllCommand(committedPartitionId, rowsToUpdate, txId, full))
.thenApply(ignored -> {
// Release short term locks.
for (CompletableFuture<IgniteBiTuple<RowId, Collection<Lock>>> rowIdFut : rowIdFuts) {
rowIdFut.join().get2()
.forEach(lock -> lockManager.release(lock.txId(), lock.lockKey(), lock.lockMode()));
}
return null;
});
});
}
default: {
throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
format("Unknown multi request [actionType={}]", request.requestType()));
}
}
}