in modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java [1852:2019]
private CompletableFuture<Object> processSingleEntryAction(ReadWriteSingleRowReplicaRequest request) {
UUID txId = request.transactionId();
BinaryRow searchRow = request.binaryRow();
TablePartitionId commitPartitionId = request.commitPartitionId();
boolean full = request.full();
assert commitPartitionId != null || request.requestType() == RequestType.RW_GET :
"Commit partition is null [type=" + request.requestType() + ']';
switch (request.requestType()) {
case RW_GET: {
return resolveRowByPk(searchRow, txId, (rowId, row) -> {
if (rowId == null) {
return completedFuture(null);
}
return takeLocksForGet(rowId, txId)
.thenApply(ignored -> row);
});
}
case RW_DELETE: {
return resolveRowByPk(searchRow, txId, (rowId, row) -> {
if (rowId == null) {
return completedFuture(false);
}
return takeLocksForDelete(row, rowId, txId)
.thenCompose(ignored -> applyUpdateCommand(
updateCommand(commitPartitionId, rowId.uuid(), null, txId, full)))
.thenApply(ignored -> true);
});
}
case RW_GET_AND_DELETE: {
return resolveRowByPk(searchRow, txId, (rowId, row) -> {
if (rowId == null) {
return completedFuture(null);
}
return takeLocksForDelete(row, rowId, txId)
.thenCompose(ignored -> applyUpdateCommand(
updateCommand(commitPartitionId, rowId.uuid(), null, txId, full)))
.thenApply(ignored -> row);
});
}
case RW_DELETE_EXACT: {
return resolveRowByPk(searchRow, txId, (rowId, row) -> {
if (rowId == null) {
return completedFuture(false);
}
return takeLocksForDeleteExact(searchRow, rowId, row, txId)
.thenCompose(validatedRowId -> {
if (validatedRowId == null) {
return completedFuture(false);
}
return applyUpdateCommand(
updateCommand(commitPartitionId, validatedRowId.uuid(), null, txId, full))
.thenApply(ignored -> true);
});
});
}
case RW_INSERT: {
return resolveRowByPk(searchRow, txId, (rowId, row) -> {
if (rowId != null) {
return completedFuture(false);
}
RowId rowId0 = new RowId(partId(), UUID.randomUUID());
return takeLocksForInsert(searchRow, rowId0, txId)
.thenCompose(rowIdLock -> applyUpdateCommand(
updateCommand(commitPartitionId, rowId0.uuid(), searchRow, txId, full))
.thenApply(ignored -> rowIdLock))
.thenApply(rowIdLock -> {
// Release short term locks.
rowIdLock.get2().forEach(lock -> lockManager.release(lock.txId(), lock.lockKey(), lock.lockMode()));
return true;
});
});
}
case RW_UPSERT: {
return resolveRowByPk(searchRow, txId, (rowId, row) -> {
boolean insert = rowId == null;
RowId rowId0 = insert ? new RowId(partId(), UUID.randomUUID()) : rowId;
CompletableFuture<IgniteBiTuple<RowId, Collection<Lock>>> lockFut = insert
? takeLocksForInsert(searchRow, rowId0, txId)
: takeLocksForUpdate(searchRow, rowId0, txId);
return lockFut
.thenCompose(rowIdLock -> applyUpdateCommand(
updateCommand(commitPartitionId, rowId0.uuid(), searchRow, txId, full))
.thenApply(ignored -> rowIdLock))
.thenApply(rowIdLock -> {
// Release short term locks.
rowIdLock.get2().forEach(lock -> lockManager.release(lock.txId(), lock.lockKey(), lock.lockMode()));
return null;
});
});
}
case RW_GET_AND_UPSERT: {
return resolveRowByPk(searchRow, txId, (rowId, row) -> {
boolean insert = rowId == null;
RowId rowId0 = insert ? new RowId(partId(), UUID.randomUUID()) : rowId;
CompletableFuture<IgniteBiTuple<RowId, Collection<Lock>>> lockFut = insert
? takeLocksForInsert(searchRow, rowId0, txId)
: takeLocksForUpdate(searchRow, rowId0, txId);
return lockFut
.thenCompose(rowIdLock -> applyUpdateCommand(
updateCommand(commitPartitionId, rowId0.uuid(), searchRow, txId, full))
.thenApply(ignored -> rowIdLock))
.thenApply(rowIdLock -> {
// Release short term locks.
rowIdLock.get2().forEach(lock -> lockManager.release(lock.txId(), lock.lockKey(), lock.lockMode()));
return row;
});
});
}
case RW_GET_AND_REPLACE: {
return resolveRowByPk(searchRow, txId, (rowId, row) -> {
if (rowId == null) {
return completedFuture(null);
}
return takeLocksForUpdate(searchRow, rowId, txId)
.thenCompose(rowIdLock -> applyUpdateCommand(
updateCommand(commitPartitionId, rowId.uuid(), searchRow, txId, full))
.thenApply(ignored -> rowIdLock))
.thenApply(rowIdLock -> {
// Release short term locks.
rowIdLock.get2().forEach(lock -> lockManager.release(lock.txId(), lock.lockKey(), lock.lockMode()));
return row;
});
});
}
case RW_REPLACE_IF_EXIST: {
return resolveRowByPk(searchRow, txId, (rowId, row) -> {
if (rowId == null) {
return completedFuture(false);
}
return takeLocksForUpdate(searchRow, rowId, txId)
.thenCompose(rowLock -> applyUpdateCommand(
updateCommand(commitPartitionId, rowId.uuid(), searchRow, txId, full))
.thenApply(ignored -> rowLock))
.thenApply(rowIdLock -> {
// Release short term locks.
rowIdLock.get2().forEach(lock -> lockManager.release(lock.txId(), lock.lockKey(), lock.lockMode()));
return true;
});
});
}
default: {
throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
format("Unknown single request [actionType={}]", request.requestType()));
}
}
}