in modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java [2641:2831]
private CompletableFuture<ReplicaResult> processSingleEntryAction(ReadWriteSingleRowReplicaRequest request, long leaseStartTime) {
UUID txId = request.transactionId();
BinaryRow searchRow = request.binaryRow();
ReplicationGroupId commitPartitionId = request.commitPartitionId().asReplicationGroupId();
assert commitPartitionId != null : "Commit partition is null [type=" + request.requestType() + ']';
switch (request.requestType()) {
case RW_DELETE_EXACT: {
return resolveRowByPk(extractPk(searchRow), txId, (rowId, row, lastCommitTime) -> {
if (rowId == null) {
return completedFuture(new ReplicaResult(false, null));
}
return takeLocksForDeleteExact(searchRow, rowId, row, txId)
.thenCompose(validatedRowId -> {
if (validatedRowId == null) {
return completedFuture(new ReplicaResult(false, null));
}
return validateWriteAgainstSchemaAfterTakingLocks(request.transactionId())
.thenCompose(catalogVersion -> awaitCleanup(validatedRowId, catalogVersion))
.thenCompose(
catalogVersion -> applyUpdateCommand(
request,
validatedRowId.uuid(),
null,
lastCommitTime,
catalogVersion,
leaseStartTime
)
)
.thenApply(res -> new ReplicaResult(true, res));
});
});
}
case RW_INSERT: {
return resolveRowByPk(extractPk(searchRow), txId, (rowId, row, lastCommitTime) -> {
if (rowId != null) {
return completedFuture(new ReplicaResult(false, null));
}
RowId rowId0 = new RowId(partId(), RowIdGenerator.next());
return takeLocksForInsert(searchRow, rowId0, txId)
.thenCompose(rowIdLock -> validateWriteAgainstSchemaAfterTakingLocks(request.transactionId())
.thenCompose(
catalogVersion -> applyUpdateCommand(
request,
rowId0.uuid(),
searchRow,
lastCommitTime,
catalogVersion,
leaseStartTime
)
)
.thenApply(res -> new IgniteBiTuple<>(res, rowIdLock)))
.thenApply(tuple -> {
// Release short term locks.
tuple.get2().get2().forEach(lock -> lockManager.release(lock.txId(), lock.lockKey(), lock.lockMode()));
return new ReplicaResult(true, tuple.get1());
});
});
}
case RW_UPSERT: {
return resolveRowByPk(extractPk(searchRow), txId, (rowId, row, lastCommitTime) -> {
boolean insert = rowId == null;
RowId rowId0 = insert ? new RowId(partId(), RowIdGenerator.next()) : rowId;
CompletableFuture<IgniteBiTuple<RowId, Collection<Lock>>> lockFut = insert
? takeLocksForInsert(searchRow, rowId0, txId)
: takeLocksForUpdate(searchRow, rowId0, txId);
return lockFut
.thenCompose(rowIdLock -> validateWriteAgainstSchemaAfterTakingLocks(request.transactionId())
.thenCompose(catalogVersion -> awaitCleanup(rowId, catalogVersion))
.thenCompose(
catalogVersion -> applyUpdateCommand(
request,
rowId0.uuid(),
searchRow,
lastCommitTime,
catalogVersion,
leaseStartTime
)
)
.thenApply(res -> new IgniteBiTuple<>(res, rowIdLock)))
.thenApply(tuple -> {
// Release short term locks.
tuple.get2().get2().forEach(lock -> lockManager.release(lock.txId(), lock.lockKey(), lock.lockMode()));
return new ReplicaResult(null, tuple.get1());
});
});
}
case RW_GET_AND_UPSERT: {
return resolveRowByPk(extractPk(searchRow), txId, (rowId, row, lastCommitTime) -> {
boolean insert = rowId == null;
RowId rowId0 = insert ? new RowId(partId(), RowIdGenerator.next()) : rowId;
CompletableFuture<IgniteBiTuple<RowId, Collection<Lock>>> lockFut = insert
? takeLocksForInsert(searchRow, rowId0, txId)
: takeLocksForUpdate(searchRow, rowId0, txId);
return lockFut
.thenCompose(rowIdLock -> validateWriteAgainstSchemaAfterTakingLocks(request.transactionId())
.thenCompose(catalogVersion -> awaitCleanup(rowId, catalogVersion))
.thenCompose(
catalogVersion -> applyUpdateCommand(
request,
rowId0.uuid(),
searchRow,
lastCommitTime,
catalogVersion,
leaseStartTime
)
)
.thenApply(res -> new IgniteBiTuple<>(res, rowIdLock)))
.thenApply(tuple -> {
// Release short term locks.
tuple.get2().get2().forEach(lock -> lockManager.release(lock.txId(), lock.lockKey(), lock.lockMode()));
return new ReplicaResult(row, tuple.get1());
});
});
}
case RW_GET_AND_REPLACE: {
return resolveRowByPk(extractPk(searchRow), txId, (rowId, row, lastCommitTime) -> {
if (rowId == null) {
return completedFuture(new ReplicaResult(null, null));
}
return takeLocksForUpdate(searchRow, rowId, txId)
.thenCompose(rowIdLock -> validateWriteAgainstSchemaAfterTakingLocks(request.transactionId())
.thenCompose(catalogVersion -> awaitCleanup(rowId, catalogVersion))
.thenCompose(
catalogVersion -> applyUpdateCommand(
request,
rowId.uuid(),
searchRow,
lastCommitTime,
catalogVersion,
leaseStartTime
)
)
.thenApply(res -> new IgniteBiTuple<>(res, rowIdLock)))
.thenApply(tuple -> {
// Release short term locks.
tuple.get2().get2().forEach(lock -> lockManager.release(lock.txId(), lock.lockKey(), lock.lockMode()));
return new ReplicaResult(row, tuple.get1());
});
});
}
case RW_REPLACE_IF_EXIST: {
return resolveRowByPk(extractPk(searchRow), txId, (rowId, row, lastCommitTime) -> {
if (rowId == null) {
return completedFuture(new ReplicaResult(false, null));
}
return takeLocksForUpdate(searchRow, rowId, txId)
.thenCompose(rowIdLock -> validateWriteAgainstSchemaAfterTakingLocks(request.transactionId())
.thenCompose(catalogVersion -> awaitCleanup(rowId, catalogVersion))
.thenCompose(
catalogVersion -> applyUpdateCommand(
request,
rowId.uuid(),
searchRow,
lastCommitTime,
catalogVersion,
leaseStartTime
)
)
.thenApply(res -> new IgniteBiTuple<>(res, rowIdLock)))
.thenApply(tuple -> {
// Release short term locks.
tuple.get2().get2().forEach(lock -> lockManager.release(lock.txId(), lock.lockKey(), lock.lockMode()));
return new ReplicaResult(true, tuple.get1());
});
});
}
default: {
throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
format("Unknown single request [actionType={}]", request.requestType()));
}
}
}