CoTryTask handle()

in src/client/cli/admin/ReadBench.cc [46:205]


CoTryTask<Dispatcher::OutputTable> handle(IEnv &ienv,
                                          const argparse::ArgumentParser &parser,
                                          const Dispatcher::Args &args) {
  auto &env = dynamic_cast<AdminEnv &>(ienv);
  ENSURE_USAGE(args.empty());
  Dispatcher::OutputTable table;

  // 1. parse arguments.
  Path path = parser.get<std::string>("path");
  auto threads = parser.get<uint32_t>("--threads");
  auto coroutines = parser.get<uint32_t>("--coroutines");
  auto deadline = RelativeTime::now() + 1_s * parser.get<uint32_t>("--seconds");
  auto modeStr = parser.present<std::string>("--mode").value_or("Default");
  auto mode = magic_enum::enum_cast<storage::client::TargetSelectionMode>(modeStr);
  auto isWrite = parser.get<bool>("--write");
  if (!mode) {
    co_return makeError(StorageClientCode::kRoutingError, "mode is invalid");
  }
  size_t blockSize = 4_KB;
  if (auto p = parser.present<std::string>("--bs")) {
    auto result = Size::from(*p);
    CO_RETURN_AND_LOG_ON_ERROR(result);
    blockSize = *result;
  }
  size_t iodepth = 1024;
  if (auto p = parser.present<std::string>("--iodepth")) {
    auto result = Size::from(*p);
    CO_RETURN_AND_LOG_ON_ERROR(result);
    iodepth = *result;
  }
  auto pool = std::make_unique<folly::CPUThreadPoolExecutor>(std::make_pair(threads, threads),
                                                             std::make_shared<folly::NamedThreadFactory>("Pool"));

  // 2. open files.
  std::string prev;
  std::vector<FileWrapper> files;
  while (true) {
    auto res = co_await env.metaClientGetter()->list(env.userInfo, env.currentDirId, path, prev, 0, false);
    CO_RETURN_AND_LOG_ON_ERROR(res);
    const auto &lsp = res.value();
    for (const auto &entry : lsp.entries) {
      if (entry.isFile()) {
        auto openResult = co_await FileWrapper::openOrCreateFile(env, path / entry.name);
        CO_RETURN_AND_LOG_ON_ERROR(openResult);
        files.push_back(std::move(*openResult));
      }
    }
    if (lsp.more) {
      prev = lsp.entries.at(lsp.entries.size() - 1).name;
    } else {
      break;
    }
  }

  // 3. read.
  auto storageClient = env.storageClientGetter();
  std::vector<CoTryTask<Void>> total;
  std::atomic<size_t> readBytes{};
  total.reserve(coroutines);
  co_await env.mgmtdClientGetter()->refreshRoutingInfo(true);
  auto rdmabufPool = net::RDMABufPool::create(64_MB, 512);
  for (auto i = 0u; i < coroutines; ++i) {
    total.push_back(folly::coro::co_invoke([&]() -> CoTryTask<Void> {
      std::ofstream out("/dev/null");
      std::vector<storage::client::ReadIO> readIOs;
      std::vector<storage::client::WriteIO> writeIOs;
      std::vector<storage::client::IOBuffer> buffers;
      net::RDMABuf current;
      storage::client::ReadOptions readOptions;
      storage::client::WriteOptions writeOptions;
      readOptions.targetSelection().set_mode(*mode);
      readIOs.reserve(1024);
      writeIOs.reserve(1024);
      buffers.reserve(1024);
      while (RelativeTime::now() <= deadline) {
        readIOs.clear();
        writeIOs.clear();
        buffers.clear();
        current = {};
        auto routingInfo = env.mgmtdClientGetter()->getRoutingInfo()->raw();
        if (UNLIKELY(routingInfo == nullptr)) {
          co_return makeError(StorageClientCode::kRoutingError, "routing info is null");
        }

        for (auto i = 0u; i < iodepth; ++i) {
          auto &file = files[folly::Random::rand32() % files.size()];
          if (UNLIKELY(file.length() == 0)) {
            continue;
          }
          auto chunkSize = file.file().layout.chunkSize;
          auto offset = folly::Random::rand32() * blockSize % file.length();
          auto chainResult = file.file().getChainId(file.inode(), offset, *routingInfo, 0);
          CO_RETURN_AND_LOG_ON_ERROR(chainResult);
          auto chunk =
              file.file().getChunkId(file.inode().id, offset).then([](auto chunk) { return storage::ChunkId(chunk); });
          CO_RETURN_AND_LOG_ON_ERROR(chunk);
          auto l = std::min(file.file().length - offset, blockSize);
          if (current.size() < l) {
            current = co_await rdmabufPool->allocate();
            if (UNLIKELY(!current)) {
              XLOGF(FATAL, "allocate buffer failed");
            }
            buffers.emplace_back(current);
          }
          if (isWrite) {
            writeIOs.push_back(storageClient->createWriteIO(*chainResult,
                                                            *chunk,
                                                            offset % chunkSize,
                                                            l,
                                                            chunkSize,
                                                            current.ptr(),
                                                            &buffers.back()));
          } else {
            readIOs.push_back(
                storageClient
                    ->createReadIO(*chainResult, *chunk, offset % chunkSize, l, current.ptr(), &buffers.back()));
          }
          current.advance(l);
        }

        if (isWrite) {
          auto writeResult = co_await storageClient->batchWrite(writeIOs, env.userInfo, writeOptions);
          CO_RETURN_AND_LOG_ON_ERROR(writeResult);
        } else {
          auto readResult = co_await storageClient->batchRead(readIOs, env.userInfo, readOptions);
          CO_RETURN_AND_LOG_ON_ERROR(readResult);
        }
        size_t succBytes = 0;
        for (auto &io : readIOs) {
          if (LIKELY(bool(io.result.lengthInfo))) {
            succBytes += *io.result.lengthInfo;
          }
        }
        for (auto &io : writeIOs) {
          if (LIKELY(bool(io.result.lengthInfo))) {
            succBytes += *io.result.lengthInfo;
          }
        }
        readBytes += succBytes;
      }
      co_return Void{};
    }));
  }

  std::atomic<bool> stop = false;
  std::jthread t([&] {
    while (!stop) {
      std::this_thread::sleep_for(1_s);
      XLOGF(WARNING, "{}/s", Size::around(readBytes.exchange(0)));
    }
  });

  auto results = co_await folly::coro::collectAllRange(std::move(total)).scheduleOn(pool.get());
  for (auto result : results) {
    CO_RETURN_AND_LOG_ON_ERROR(result);
  }

  stop = true;
  co_return table;
}