CoTryTask StorageOperator::batchRead()

in src/storage/service/StorageOperator.cc [82:231]


CoTryTask<BatchReadRsp> StorageOperator::batchRead(ServiceRequestContext &requestCtx,
                                                   const BatchReadReq &req,
                                                   serde::CallContext &ctx) {
  XLOGF(DBG5, "Received batch read request {} with tag {} and {} IOs", fmt::ptr(&req), req.tag, req.payloads.size());

  auto recordGuard = storageReqReadRecorder.record(monitor::instanceTagSet(std::to_string(req.userInfo.uid)));

  auto prepareTargetRecordGuard = storageReadPrepareTarget.record();
  auto snapshot = components_.targetMap.snapshot();
  auto batchSize = req.payloads.size();
  BatchReadRsp rsp;
  rsp.results.resize(batchSize);
  BatchReadJob batch(req.payloads, rsp.results, req.checksumType);
  storageReadCount.addSample(batchSize);
  storageReqReadSize.addSample(batchSize);

  size_t totalLength = 0;
  size_t totalHeadLength = 0;
  size_t totalTailLength = 0;
  for (AioReadJobIterator it(&batch); it; it++) {
    // get target for batch read, need check public and local state.
    auto targetResult = FAULT_INJECTION_POINT(
        requestCtx.debugFlags.injectServerError(),
        makeError(StorageCode::kChainVersionMismatch),
        snapshot->getByChainId(it->readIO().key.vChainId, config_.batch_read_ignore_chain_version()));
    if (UNLIKELY(!targetResult)) {
      auto msg = fmt::format("read get target failed, req {}, error {}", it->readIO(), targetResult.error());
      XLOG(ERR, msg);
      co_return makeError(std::move(targetResult.error()));
    }
    auto target = std::move(*targetResult);
    if (UNLIKELY(!target->upToDate())) {
      auto msg = fmt::format("read target is not upToDate, req {}, target {}", it->readIO(), *target);
      XLOG(ERR, msg);
      co_return makeError(StorageCode::kTargetStateInvalid, std::move(msg));
    }
    it->state().storageTarget = target->storageTarget.get();
    totalLength += it->readIO().length;
    totalHeadLength += it->state().headLength;
    totalTailLength += it->state().tailLength;
    if (FAULT_INJECTION_POINT(requestCtx.debugFlags.injectServerError(),
                              true,
                              UNLIKELY(it->readIO().length > it->readIO().rdmabuf.size()))) {
      auto msg = fmt::format("invalid read buffer size {}", it->readIO());
      XLOG(ERR, msg);
      co_return makeError(StatusCode::kInvalidArg, std::move(msg));
    }
    it->state().readUncommitted = BITFLAGS_CONTAIN(req.featureFlags, FeatureFlags::ALLOW_READ_UNCOMMITTED);
  }
  totalReadBytes_ += totalLength;
  totalReadIOs_ += batchSize;
  storageReadBytes.addSample(totalLength);
  aioTotalHeadLength.addSample(totalHeadLength);
  aioTotalTailLength.addSample(totalTailLength);
  aioTotalAlignedLength.addSample(totalLength + totalHeadLength + totalTailLength);
  prepareTargetRecordGuard.report(true);

  auto prepareBufferRecordGuard = storageReadPrepareBuffer.record();
  auto buffer = components_.rdmabufPool.get();
  for (AioReadJobIterator it(&batch); it; it++) {
    auto &job = *it;
    auto allocateResult = buffer.tryAllocate(job.alignedLength());
    if (UNLIKELY(!allocateResult)) {
      allocateResult = co_await buffer.allocate(job.alignedLength());
    }
    if (UNLIKELY(!allocateResult)) {
      auto msg = fmt::format("read allocate buffer failed, req {}, length {}", job.readIO(), job.alignedLength());
      XLOG(ERR, msg);
      co_return makeError(RPCCode::kRDMANoBuf, std::move(msg));
    }
    job.state().localbuf = std::move(*allocateResult);
    job.state().bufferIndex = buffer.index();
  }
  prepareBufferRecordGuard.report(true);

  if (BITFLAGS_CONTAIN(req.featureFlags, FeatureFlags::BYPASS_DISKIO)) {
    for (AioReadJobIterator it(&batch); it; it++) {
      it->result().lengthInfo = it->readIO().length;
      batch.finish(&*it);
    }
  } else {
    auto recordGuard = storageAioEnqueueRecorder.record();
    auto splitSize = config_.batch_read_job_split_size();
    for (uint32_t start = 0; start < batchSize; start += splitSize) {
      co_await components_.aioReadWorker.enqueue(AioReadJobIterator(&batch, start, splitSize));
    }
    recordGuard.report(true);
  }

  auto waitAioAndPostRecordGuard = storageWaitAioAndPostRecorder.record();
  auto waitAioRecordGuard = storageWaitAioRecorder.record();
  co_await batch.complete();
  waitAioRecordGuard.report(true);

  if (BITFLAGS_CONTAIN(req.featureFlags, FeatureFlags::SEND_DATA_INLINE)) {
    batch.copyToRespBuffer(rsp.inlinebuf.data);
  } else if (!BITFLAGS_CONTAIN(req.featureFlags, FeatureFlags::BYPASS_RDMAXMIT)) {
    auto ibSocket = ctx.transport()->ibSocket();
    if (UNLIKELY(ibSocket == nullptr)) {
      XLOGF(ERR, "batch read no RDMA socket");
      co_return makeError(StatusCode::kInvalidArg, "batch read no RDMA socket");
    }

    auto waitBatchRecordGuard = storageWaitBatchRecorder.record();
    auto writeBatch = ctx.writeTransmission();
    batch.addBufferToBatch(writeBatch);
    waitBatchRecordGuard.report(true);

    auto rdmaSemaphoreIter = concurrentRdmaWriteSemaphore_.find(ibSocket->device()->id());
    if (rdmaSemaphoreIter == concurrentRdmaWriteSemaphore_.end()) {
      XLOGF(CRITICAL,
            "Cannot find RDMA operation semaphore for IB device #{} {}",
            ibSocket->device()->id(),
            ibSocket->device()->name());
      co_return makeError(RPCCode::kIBDeviceNotFound);
    }

    auto RDMATransmissionReqTimeout = config_.rdma_transmission_req_timeout();
    bool applyTransmissionBeforeGettingSemaphore = config_.apply_transmission_before_getting_semaphore();
    if (ctx.packet().controlRDMA() && RDMATransmissionReqTimeout != 0_ms && applyTransmissionBeforeGettingSemaphore) {
      co_await writeBatch.applyTransmission(RDMATransmissionReqTimeout);
    }

    auto ibdevTagSet = monitor::instanceTagSet(ibSocket->device()->name());
    auto waitSemRecordGuard = storageWaitSemRecorder.record(ibdevTagSet);
    SemaphoreGuard guard(rdmaSemaphoreIter->second);
    co_await guard.coWait();
    waitSemRecordGuard.report(true);

    if (ctx.packet().controlRDMA() && RDMATransmissionReqTimeout != 0_ms && !applyTransmissionBeforeGettingSemaphore) {
      co_await writeBatch.applyTransmission(RDMATransmissionReqTimeout);
    }

    auto waitPostRecordGuard = storageWaitPostRecorder.record(ibdevTagSet);
    auto postResult = FAULT_INJECTION_POINT(requestCtx.debugFlags.injectServerError(),
                                            makeError(RPCCode::kRDMAPostFailed),
                                            (co_await writeBatch.post()));
    if (UNLIKELY(!postResult)) {
      for (AioReadJobIterator it(&batch); it; it++) {
        it->result().lengthInfo = makeError(std::move(postResult.error()));
      }
    } else {
      waitPostRecordGuard.succ();
    }
  }
  waitAioAndPostRecordGuard.report(true);

  recordGuard.succ();
  co_return rsp;
}