in modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java [1946:2209]
private CompletableFuture<ReplicaResult> processMultiEntryAction(ReadWriteMultiRowReplicaRequest request, long leaseStartTime) {
UUID txId = request.transactionId();
ReplicationGroupId commitPartitionId = request.commitPartitionId().asReplicationGroupId();
List<BinaryRow> searchRows = request.binaryRows();
assert commitPartitionId != null : "Commit partition is null [type=" + request.requestType() + ']';
switch (request.requestType()) {
case RW_DELETE_EXACT_ALL: {
CompletableFuture<RowId>[] deleteExactLockFuts = new CompletableFuture[searchRows.size()];
Map<UUID, HybridTimestamp> lastCommitTimes = new ConcurrentHashMap<>();
for (int i = 0; i < searchRows.size(); i++) {
BinaryRow searchRow = searchRows.get(i);
deleteExactLockFuts[i] = resolveRowByPk(extractPk(searchRow), txId, (rowId, row, lastCommitTime) -> {
if (rowId == null) {
return nullCompletedFuture();
}
if (lastCommitTime != null) {
lastCommitTimes.put(rowId.uuid(), lastCommitTime);
}
return takeLocksForDeleteExact(searchRow, rowId, row, txId);
});
}
return allOf(deleteExactLockFuts).thenCompose(ignore -> {
Map<UUID, TimedBinaryRowMessage> rowIdsToDelete = new HashMap<>();
// TODO:IGNITE-20669 Replace the result to BitSet.
Collection<BinaryRow> result = new ArrayList<>();
List<RowId> rows = new ArrayList<>();
for (int i = 0; i < searchRows.size(); i++) {
RowId lockedRowId = deleteExactLockFuts[i].join();
if (lockedRowId != null) {
rowIdsToDelete.put(lockedRowId.uuid(), PARTITION_REPLICATION_MESSAGES_FACTORY.timedBinaryRowMessage()
.timestamp(lastCommitTimes.get(lockedRowId.uuid()))
.build());
result.add(new NullBinaryRow());
rows.add(lockedRowId);
} else {
result.add(null);
}
}
if (rowIdsToDelete.isEmpty()) {
return completedFuture(new ReplicaResult(result, null));
}
return validateWriteAgainstSchemaAfterTakingLocks(request.transactionId())
.thenCompose(catalogVersion -> awaitCleanup(rows, catalogVersion))
.thenCompose(
catalogVersion -> applyUpdateAllCommand(
request,
rowIdsToDelete,
catalogVersion,
leaseStartTime
)
)
.thenApply(res -> new ReplicaResult(result, res));
});
}
case RW_INSERT_ALL: {
List<BinaryTuple> pks = new ArrayList<>(searchRows.size());
CompletableFuture<RowId>[] pkReadLockFuts = new CompletableFuture[searchRows.size()];
for (int i = 0; i < searchRows.size(); i++) {
BinaryTuple pk = extractPk(searchRows.get(i));
pks.add(pk);
pkReadLockFuts[i] = resolveRowByPk(pk, txId, (rowId, row, lastCommitTime) -> completedFuture(rowId));
}
return allOf(pkReadLockFuts).thenCompose(ignore -> {
// TODO:IGNITE-20669 Replace the result to BitSet.
Collection<BinaryRow> result = new ArrayList<>();
Map<RowId, BinaryRow> rowsToInsert = new HashMap<>();
Set<ByteBuffer> uniqueKeys = new HashSet<>();
for (int i = 0; i < searchRows.size(); i++) {
BinaryRow row = searchRows.get(i);
RowId lockedRow = pkReadLockFuts[i].join();
if (lockedRow == null && uniqueKeys.add(pks.get(i).byteBuffer())) {
rowsToInsert.put(new RowId(partId(), RowIdGenerator.next()), row);
result.add(new NullBinaryRow());
} else {
result.add(null);
}
}
if (rowsToInsert.isEmpty()) {
return completedFuture(new ReplicaResult(result, null));
}
CompletableFuture<IgniteBiTuple<RowId, Collection<Lock>>>[] insertLockFuts = new CompletableFuture[rowsToInsert.size()];
int idx = 0;
for (Map.Entry<RowId, BinaryRow> entry : rowsToInsert.entrySet()) {
insertLockFuts[idx++] = takeLocksForInsert(entry.getValue(), entry.getKey(), txId);
}
Map<UUID, TimedBinaryRowMessage> convertedMap = rowsToInsert.entrySet().stream()
.collect(toMap(
e -> e.getKey().uuid(),
e -> PARTITION_REPLICATION_MESSAGES_FACTORY.timedBinaryRowMessage()
.binaryRowMessage(binaryRowMessage(e.getValue()))
.build()
));
return allOf(insertLockFuts)
.thenCompose(ignored ->
// We are inserting completely new rows - no need to cleanup anything in this case, hence empty times.
validateWriteAgainstSchemaAfterTakingLocks(request.transactionId())
)
.thenCompose(catalogVersion -> applyUpdateAllCommand(
request,
convertedMap,
catalogVersion,
leaseStartTime
)
)
.thenApply(res -> {
// Release short term locks.
for (CompletableFuture<IgniteBiTuple<RowId, Collection<Lock>>> insertLockFut : insertLockFuts) {
insertLockFut.join().get2()
.forEach(lock -> lockManager.release(lock.txId(), lock.lockKey(), lock.lockMode()));
}
return new ReplicaResult(result, res);
});
});
}
case RW_UPSERT_ALL: {
CompletableFuture<IgniteBiTuple<RowId, Collection<Lock>>>[] rowIdFuts = new CompletableFuture[searchRows.size()];
BinaryTuple[] pks = new BinaryTuple[searchRows.size()];
Map<UUID, HybridTimestamp> lastCommitTimes = new ConcurrentHashMap<>();
BitSet deleted = request.deleted();
// When the same key is updated multiple times within the same batch, we need to maintain operation order and apply
// only the last update. This map stores the previous searchRows index for each key.
Map<ByteBuffer, Integer> prevRowIdx = new HashMap<>();
for (int i = 0; i < searchRows.size(); i++) {
BinaryRow searchRow = searchRows.get(i);
boolean isDelete = deleted != null && deleted.get(i);
BinaryTuple pk = isDelete
? resolvePk(searchRow.tupleSlice())
: extractPk(searchRow);
pks[i] = pk;
Integer prevRowIdx0 = prevRowIdx.put(pk.byteBuffer(), i);
if (prevRowIdx0 != null) {
rowIdFuts[prevRowIdx0] = nullCompletedFuture(); // Skip previous row with the same key.
}
}
for (int i = 0; i < searchRows.size(); i++) {
if (rowIdFuts[i] != null) {
continue; // Skip previous row with the same key.
}
BinaryRow searchRow = searchRows.get(i);
boolean isDelete = deleted != null && deleted.get(i);
rowIdFuts[i] = resolveRowByPk(pks[i], txId, (rowId, row, lastCommitTime) -> {
if (isDelete && rowId == null) {
return nullCompletedFuture();
}
if (lastCommitTime != null) {
//noinspection DataFlowIssue (rowId is not null if lastCommitTime is not null)
lastCommitTimes.put(rowId.uuid(), lastCommitTime);
}
if (isDelete) {
assert row != null;
return takeLocksForDelete(row, rowId, txId)
.thenApply(id -> new IgniteBiTuple<>(id, null));
}
boolean insert = rowId == null;
RowId rowId0 = insert ? new RowId(partId(), RowIdGenerator.next()) : rowId;
return insert
? takeLocksForInsert(searchRow, rowId0, txId)
: takeLocksForUpdate(searchRow, rowId0, txId);
});
}
return allOf(rowIdFuts).thenCompose(ignore -> {
Map<UUID, TimedBinaryRowMessage> rowsToUpdate = IgniteUtils.newHashMap(searchRows.size());
List<RowId> rows = new ArrayList<>();
for (int i = 0; i < searchRows.size(); i++) {
IgniteBiTuple<RowId, Collection<Lock>> locks = rowIdFuts[i].join();
if (locks == null) {
continue;
}
RowId lockedRow = locks.get1();
TimedBinaryRowMessageBuilder timedBinaryRowMessageBuilder = PARTITION_REPLICATION_MESSAGES_FACTORY
.timedBinaryRowMessage()
.timestamp(lastCommitTimes.get(lockedRow.uuid()));
if (deleted == null || !deleted.get(i)) {
timedBinaryRowMessageBuilder.binaryRowMessage(binaryRowMessage(searchRows.get(i)));
}
rowsToUpdate.put(lockedRow.uuid(), timedBinaryRowMessageBuilder.build());
rows.add(lockedRow);
}
if (rowsToUpdate.isEmpty()) {
return completedFuture(new ReplicaResult(null, null));
}
return validateWriteAgainstSchemaAfterTakingLocks(request.transactionId())
.thenCompose(catalogVersion -> awaitCleanup(rows, catalogVersion))
.thenCompose(
catalogVersion -> applyUpdateAllCommand(
request,
rowsToUpdate,
catalogVersion,
leaseStartTime
)
)
.thenApply(res -> {
// Release short term locks.
for (CompletableFuture<IgniteBiTuple<RowId, Collection<Lock>>> rowIdFut : rowIdFuts) {
IgniteBiTuple<RowId, Collection<Lock>> futRes = rowIdFut.join();
Collection<Lock> locks = futRes == null ? null : futRes.get2();
if (locks != null) {
locks.forEach(lock -> lockManager.release(lock.txId(), lock.lockKey(), lock.lockMode()));
}
}
return new ReplicaResult(null, res);
});
});
}
default: {
throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
format("Unknown multi request [actionType={}]", request.requestType()));
}
}
}