in modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java [662:804]
private CompletableFuture<?> processOperationRequest(
UUID senderId,
ReplicaRequest request,
ReplicaPrimacy replicaPrimacy,
@Nullable HybridTimestamp opStartTsIfDirectRo
) {
if (request instanceof ReadWriteSingleRowReplicaRequest) {
var req = (ReadWriteSingleRowReplicaRequest) request;
var opId = new OperationId(senderId, req.timestamp().longValue());
return appendTxCommand(
req.transactionId(),
opId,
req.requestType(),
req.full(),
() -> processSingleEntryAction(req, replicaPrimacy.leaseStartTime())
);
} else if (request instanceof ReadWriteSingleRowPkReplicaRequest) {
var req = (ReadWriteSingleRowPkReplicaRequest) request;
var opId = new OperationId(senderId, req.timestamp().longValue());
return appendTxCommand(
req.transactionId(),
opId,
req.requestType(),
req.full(),
() -> processSingleEntryAction(req, replicaPrimacy.leaseStartTime())
);
} else if (request instanceof ReadWriteMultiRowReplicaRequest) {
var req = (ReadWriteMultiRowReplicaRequest) request;
var opId = new OperationId(senderId, req.timestamp().longValue());
return appendTxCommand(
req.transactionId(),
opId,
req.requestType(),
req.full(),
() -> processMultiEntryAction(req, replicaPrimacy.leaseStartTime())
);
} else if (request instanceof ReadWriteMultiRowPkReplicaRequest) {
var req = (ReadWriteMultiRowPkReplicaRequest) request;
var opId = new OperationId(senderId, req.timestamp().longValue());
return appendTxCommand(
req.transactionId(),
opId,
req.requestType(),
req.full(),
() -> processMultiEntryAction(req, replicaPrimacy.leaseStartTime())
);
} else if (request instanceof ReadWriteSwapRowReplicaRequest) {
var req = (ReadWriteSwapRowReplicaRequest) request;
var opId = new OperationId(senderId, req.timestamp().longValue());
return appendTxCommand(
req.transactionId(),
opId,
req.requestType(),
req.full(),
() -> processTwoEntriesAction(req, replicaPrimacy.leaseStartTime())
);
} else if (request instanceof ReadWriteScanRetrieveBatchReplicaRequest) {
var req = (ReadWriteScanRetrieveBatchReplicaRequest) request;
// Scan's request.full() has a slightly different semantics than the same field in other requests -
// it identifies an implicit transaction. Please note that request.full() is always false in the following `appendTxCommand`.
// We treat SCAN as 2pc and only switch to a 1pc mode if all table rows fit in the bucket and the transaction is implicit.
// See `req.full() && (err != null || rows.size() < req.batchSize())` condition.
// If they don't fit the bucket, the transaction is treated as 2pc.
txManager.updateTxMeta(req.transactionId(), old -> new TxStateMeta(
PENDING,
req.coordinatorId(),
req.commitPartitionId().asReplicationGroupId(),
null,
old == null ? null : old.tx(),
old == null ? null : old.isFinishedDueToTimeout()
));
var opId = new OperationId(senderId, req.timestamp().longValue());
// Implicit RW scan can be committed locally on a last batch or error.
return appendTxCommand(req.transactionId(), opId, RW_SCAN, false, () -> processScanRetrieveBatchAction(req))
.thenCompose(rows -> {
if (allElementsAreNull(rows)) {
return completedFuture(rows);
} else {
return validateRwReadAgainstSchemaAfterTakingLocks(req.transactionId())
.thenApply(ignored -> rows);
}
})
.whenComplete((rows, err) -> {
if (req.full() && (err != null || rows.size() < req.batchSize())) {
releaseTxLocks(req.transactionId());
}
});
} else if (request instanceof ScanCloseReplicaRequest) {
processScanCloseAction((ScanCloseReplicaRequest) request);
return nullCompletedFuture();
} else if (request instanceof TxFinishReplicaRequest) {
assert !enabledColocation() : request;
return txFinishReplicaRequestHandler.handle((TxFinishReplicaRequest) request);
} else if (request instanceof WriteIntentSwitchReplicaRequest) {
return processWriteIntentSwitchAction((WriteIntentSwitchReplicaRequest) request);
} else if (request instanceof TableWriteIntentSwitchReplicaRequest) {
return processTableWriteIntentSwitchAction((TableWriteIntentSwitchReplicaRequest) request);
} else if (request instanceof ReadOnlySingleRowPkReplicaRequest) {
return processReadOnlySingleEntryAction((ReadOnlySingleRowPkReplicaRequest) request, replicaPrimacy.isPrimary());
} else if (request instanceof ReadOnlyMultiRowPkReplicaRequest) {
return processReadOnlyMultiEntryAction((ReadOnlyMultiRowPkReplicaRequest) request, replicaPrimacy.isPrimary());
} else if (request instanceof ReadOnlyScanRetrieveBatchReplicaRequest) {
return processReadOnlyScanRetrieveBatchAction((ReadOnlyScanRetrieveBatchReplicaRequest) request, replicaPrimacy.isPrimary());
} else if (request instanceof ReplicaSafeTimeSyncRequest) {
return processReplicaSafeTimeSyncRequest(replicaPrimacy.isPrimary());
} else if (request instanceof BuildIndexReplicaRequest) {
return buildIndexReplicaRequestHandler.handle((BuildIndexReplicaRequest) request);
} else if (request instanceof ReadOnlyDirectSingleRowReplicaRequest) {
return processReadOnlyDirectSingleEntryAction((ReadOnlyDirectSingleRowReplicaRequest) request, opStartTsIfDirectRo);
} else if (request instanceof ReadOnlyDirectMultiRowReplicaRequest) {
return processReadOnlyDirectMultiEntryAction((ReadOnlyDirectMultiRowReplicaRequest) request, opStartTsIfDirectRo);
} else if (request instanceof TxStateCommitPartitionRequest) {
assert !enabledColocation() : request;
return txStateCommitPartitionReplicaRequestHandler.handle((TxStateCommitPartitionRequest) request);
} else if (request instanceof VacuumTxStateReplicaRequest) {
assert !enabledColocation() : request;
return vacuumTxStateReplicaRequestHandler.handle((VacuumTxStateReplicaRequest) request);
} else if (request instanceof UpdateMinimumActiveTxBeginTimeReplicaRequest) {
assert !enabledColocation() : request;
return minimumActiveTxTimeReplicaRequestHandler.handle((UpdateMinimumActiveTxBeginTimeReplicaRequest) request);
}
// Unknown request.
throw new UnsupportedReplicaRequestException(request.getClass());
}