CoTask batchRead()

in benchmarks/storage_bench/StorageBench.h [563:673]


  CoTask<uint32_t> batchRead(uint32_t instanceId) {
    // create an aligned memory block
    size_t alignedBufSize = ALIGN_UPPER(std::max(size_t(1), benchOptions_.readSize), benchOptions_.memoryAlignment);
    size_t memoryBlockSize = alignedBufSize * benchOptions_.readBatchSize;
    auto memoryBlock = (uint8_t *)folly::aligned_malloc(memoryBlockSize, sysconf(_SC_PAGESIZE));
    auto deleter = [](uint8_t *ptr) { folly::aligned_free(ptr); };
    std::unique_ptr<uint8_t, decltype(deleter)> memoryBlockPtr(memoryBlock, deleter);
    std::memset(memoryBlock, 0, memoryBlockSize);

    // register a block of memory
    auto regRes = storageClient_->registerIOBuffer(memoryBlock, memoryBlockSize);

    if (regRes.hasError()) {
      co_return regRes.error().code();
    }

    std::vector<uint8_t> expectedChunkData(setupConfig_.chunk_size());

    if (benchOptions_.verifyReadData) {
      for (size_t byteIndex = 0; byteIndex < expectedChunkData.size(); byteIndex++) {
        expectedChunkData[byteIndex] = byteIndex;
      }
    }

    // create read IOs

    auto ioBuffer = std::move(*regRes);

    ReadOptions options;
    options.set_enableChecksum(benchOptions_.verifyReadChecksum);
    options.debug().set_bypass_disk_io(benchOptions_.benchmarkNetwork);
    options.debug().set_bypass_rdma_xmit(benchOptions_.benchmarkStorage);
    options.debug().set_inject_random_server_error(benchOptions_.injectRandomServerError);
    options.debug().set_inject_random_client_error(benchOptions_.injectRandomClientError);
    options.retry().set_retry_permanent_error(benchOptions_.retryPermanentError);

    std::vector<double> elapsedMicroSecs;
    uint64_t numReadBytes = 0;
    size_t offsetAlignment =
        benchOptions_.readOffAlignment ? benchOptions_.readOffAlignment : std::max(size_t(1), benchOptions_.readSize);

    std::vector<client::ReadIO> readIOs;
    readIOs.reserve(benchOptions_.readBatchSize);

    auto benchStart = hf3fs::SteadyClock::now();
    std::vector<ChunkInfo> &chunkInfos = chunkInfos_[instanceId];

    while (true) {
      auto accumElapsedSecs = std::chrono::duration_cast<std::chrono::seconds>(hf3fs::SteadyClock::now() - benchStart);
      if (accumElapsedSecs >= std::chrono::seconds(benchOptions_.numReadSecs)) break;

      readIOs.clear();

      for (size_t readIndex = 0; readIndex < benchOptions_.readBatchSize; readIndex++) {
        uint64_t randChunkIndex = folly::Random::rand64(0, chunkInfos.size());
        const auto &[chainId, chunkId, chunkSize] = chunkInfos[randChunkIndex];
        uint32_t offset = folly::Random::rand32(0, setupConfig_.chunk_size() - benchOptions_.readSize);
        uint32_t alignedOffset = ALIGN_LOWER(offset, offsetAlignment);
        auto readIO = storageClient_->createReadIO(chainId,
                                                   chunkId,
                                                   alignedOffset /*offset*/,
                                                   benchOptions_.readSize /*length*/,
                                                   &memoryBlock[readIndex * alignedBufSize],
                                                   &ioBuffer);
        readIOs.push_back(std::move(readIO));
        numReadBytes += benchOptions_.readSize;
      }

      auto rpcStart = hf3fs::SteadyClock::now();

      co_await storageClient_->batchRead(readIOs, flat::UserInfo(), options);

      auto elapsedMicro = std::chrono::duration_cast<std::chrono::microseconds>(hf3fs::SteadyClock::now() - rpcStart);
      elapsedMicroSecs.push_back(elapsedMicro.count());

      if (!benchOptions_.ignoreIOError) {
        for (const auto &readIO : readIOs) {
          if (readIO.result.lengthInfo.hasError()) {
            XLOGF(ERR, "Error in read result: {}", readIO.result);
            co_return readIO.result.lengthInfo.error().code();
          }
          if (readIO.length != *readIO.result.lengthInfo) {
            XLOGF(ERR, "Unexpected read length: {} != {}", *readIO.result.lengthInfo, readIO.length);
            co_return StorageClientCode::kRemoteIOError;
          }
        }
      }

      if (benchOptions_.verifyReadData) {
        for (const auto &readIO : readIOs) {
          auto diffPos = std::mismatch(&readIO.data[0], &readIO.data[readIO.length], &expectedChunkData[readIO.offset]);
          uint32_t byteIndex = diffPos.first - &readIO.data[0];
          if (byteIndex < readIO.length) {
            XLOGF(ERR,
                  "Wrong data at bytes index {} and chunk offset {}: data {:#x} != expected {:#x}",
                  byteIndex,
                  readIO.offset + byteIndex,
                  *diffPos.first,
                  *diffPos.second);
            co_return StorageClientCode::kFoundBug;
          }
        }
      }
    }

    folly::TDigest digest;
    readLatencyDigests_[instanceId] = digest.merge(elapsedMicroSecs);
    numReadBytes_ += numReadBytes;

    co_return StatusCode::kOK;
  }