in src/storage/service/StorageOperator.cc [82:231]
CoTryTask<BatchReadRsp> StorageOperator::batchRead(ServiceRequestContext &requestCtx,
const BatchReadReq &req,
serde::CallContext &ctx) {
XLOGF(DBG5, "Received batch read request {} with tag {} and {} IOs", fmt::ptr(&req), req.tag, req.payloads.size());
auto recordGuard = storageReqReadRecorder.record(monitor::instanceTagSet(std::to_string(req.userInfo.uid)));
auto prepareTargetRecordGuard = storageReadPrepareTarget.record();
auto snapshot = components_.targetMap.snapshot();
auto batchSize = req.payloads.size();
BatchReadRsp rsp;
rsp.results.resize(batchSize);
BatchReadJob batch(req.payloads, rsp.results, req.checksumType);
storageReadCount.addSample(batchSize);
storageReqReadSize.addSample(batchSize);
size_t totalLength = 0;
size_t totalHeadLength = 0;
size_t totalTailLength = 0;
for (AioReadJobIterator it(&batch); it; it++) {
// get target for batch read, need check public and local state.
auto targetResult = FAULT_INJECTION_POINT(
requestCtx.debugFlags.injectServerError(),
makeError(StorageCode::kChainVersionMismatch),
snapshot->getByChainId(it->readIO().key.vChainId, config_.batch_read_ignore_chain_version()));
if (UNLIKELY(!targetResult)) {
auto msg = fmt::format("read get target failed, req {}, error {}", it->readIO(), targetResult.error());
XLOG(ERR, msg);
co_return makeError(std::move(targetResult.error()));
}
auto target = std::move(*targetResult);
if (UNLIKELY(!target->upToDate())) {
auto msg = fmt::format("read target is not upToDate, req {}, target {}", it->readIO(), *target);
XLOG(ERR, msg);
co_return makeError(StorageCode::kTargetStateInvalid, std::move(msg));
}
it->state().storageTarget = target->storageTarget.get();
totalLength += it->readIO().length;
totalHeadLength += it->state().headLength;
totalTailLength += it->state().tailLength;
if (FAULT_INJECTION_POINT(requestCtx.debugFlags.injectServerError(),
true,
UNLIKELY(it->readIO().length > it->readIO().rdmabuf.size()))) {
auto msg = fmt::format("invalid read buffer size {}", it->readIO());
XLOG(ERR, msg);
co_return makeError(StatusCode::kInvalidArg, std::move(msg));
}
it->state().readUncommitted = BITFLAGS_CONTAIN(req.featureFlags, FeatureFlags::ALLOW_READ_UNCOMMITTED);
}
totalReadBytes_ += totalLength;
totalReadIOs_ += batchSize;
storageReadBytes.addSample(totalLength);
aioTotalHeadLength.addSample(totalHeadLength);
aioTotalTailLength.addSample(totalTailLength);
aioTotalAlignedLength.addSample(totalLength + totalHeadLength + totalTailLength);
prepareTargetRecordGuard.report(true);
auto prepareBufferRecordGuard = storageReadPrepareBuffer.record();
auto buffer = components_.rdmabufPool.get();
for (AioReadJobIterator it(&batch); it; it++) {
auto &job = *it;
auto allocateResult = buffer.tryAllocate(job.alignedLength());
if (UNLIKELY(!allocateResult)) {
allocateResult = co_await buffer.allocate(job.alignedLength());
}
if (UNLIKELY(!allocateResult)) {
auto msg = fmt::format("read allocate buffer failed, req {}, length {}", job.readIO(), job.alignedLength());
XLOG(ERR, msg);
co_return makeError(RPCCode::kRDMANoBuf, std::move(msg));
}
job.state().localbuf = std::move(*allocateResult);
job.state().bufferIndex = buffer.index();
}
prepareBufferRecordGuard.report(true);
if (BITFLAGS_CONTAIN(req.featureFlags, FeatureFlags::BYPASS_DISKIO)) {
for (AioReadJobIterator it(&batch); it; it++) {
it->result().lengthInfo = it->readIO().length;
batch.finish(&*it);
}
} else {
auto recordGuard = storageAioEnqueueRecorder.record();
auto splitSize = config_.batch_read_job_split_size();
for (uint32_t start = 0; start < batchSize; start += splitSize) {
co_await components_.aioReadWorker.enqueue(AioReadJobIterator(&batch, start, splitSize));
}
recordGuard.report(true);
}
auto waitAioAndPostRecordGuard = storageWaitAioAndPostRecorder.record();
auto waitAioRecordGuard = storageWaitAioRecorder.record();
co_await batch.complete();
waitAioRecordGuard.report(true);
if (BITFLAGS_CONTAIN(req.featureFlags, FeatureFlags::SEND_DATA_INLINE)) {
batch.copyToRespBuffer(rsp.inlinebuf.data);
} else if (!BITFLAGS_CONTAIN(req.featureFlags, FeatureFlags::BYPASS_RDMAXMIT)) {
auto ibSocket = ctx.transport()->ibSocket();
if (UNLIKELY(ibSocket == nullptr)) {
XLOGF(ERR, "batch read no RDMA socket");
co_return makeError(StatusCode::kInvalidArg, "batch read no RDMA socket");
}
auto waitBatchRecordGuard = storageWaitBatchRecorder.record();
auto writeBatch = ctx.writeTransmission();
batch.addBufferToBatch(writeBatch);
waitBatchRecordGuard.report(true);
auto rdmaSemaphoreIter = concurrentRdmaWriteSemaphore_.find(ibSocket->device()->id());
if (rdmaSemaphoreIter == concurrentRdmaWriteSemaphore_.end()) {
XLOGF(CRITICAL,
"Cannot find RDMA operation semaphore for IB device #{} {}",
ibSocket->device()->id(),
ibSocket->device()->name());
co_return makeError(RPCCode::kIBDeviceNotFound);
}
auto RDMATransmissionReqTimeout = config_.rdma_transmission_req_timeout();
bool applyTransmissionBeforeGettingSemaphore = config_.apply_transmission_before_getting_semaphore();
if (ctx.packet().controlRDMA() && RDMATransmissionReqTimeout != 0_ms && applyTransmissionBeforeGettingSemaphore) {
co_await writeBatch.applyTransmission(RDMATransmissionReqTimeout);
}
auto ibdevTagSet = monitor::instanceTagSet(ibSocket->device()->name());
auto waitSemRecordGuard = storageWaitSemRecorder.record(ibdevTagSet);
SemaphoreGuard guard(rdmaSemaphoreIter->second);
co_await guard.coWait();
waitSemRecordGuard.report(true);
if (ctx.packet().controlRDMA() && RDMATransmissionReqTimeout != 0_ms && !applyTransmissionBeforeGettingSemaphore) {
co_await writeBatch.applyTransmission(RDMATransmissionReqTimeout);
}
auto waitPostRecordGuard = storageWaitPostRecorder.record(ibdevTagSet);
auto postResult = FAULT_INJECTION_POINT(requestCtx.debugFlags.injectServerError(),
makeError(RPCCode::kRDMAPostFailed),
(co_await writeBatch.post()));
if (UNLIKELY(!postResult)) {
for (AioReadJobIterator it(&batch); it; it++) {
it->result().lengthInfo = makeError(std::move(postResult.error()));
}
} else {
waitPostRecordGuard.succ();
}
}
waitAioAndPostRecordGuard.report(true);
recordGuard.succ();
co_return rsp;
}