CoTryTask dumpChunkMeta()

in src/client/cli/admin/DumpChunkMeta.cc [28:166]


CoTryTask<Dispatcher::OutputTable> dumpChunkMeta(IEnv &ienv,
                                                 const argparse::ArgumentParser &parser,
                                                 const Dispatcher::Args &args) {
  auto &env = dynamic_cast<AdminEnv &>(ienv);
  ENSURE_USAGE(args.empty());
  Dispatcher::OutputTable table;

  const auto &chainIds = parser.get<std::vector<uint32_t>>("chain-ids");
  const auto &chunkmetaDir = parser.get<std::string>("chunkmeta-dir");
  const auto parallel = std::min(parser.get<uint32_t>("parallel"), std::thread::hardware_concurrency() / 2);
  const auto &parquetFormat = parser.get<bool>("parquet-format");
  const auto &onlyHead = parser.get<bool>("only-head");

  if (boost::filesystem::exists(chunkmetaDir)) {
    XLOGF(CRITICAL, "Output directory for chunk metadata already exists: {}", chunkmetaDir);
    co_return makeError(StatusCode::kInvalidArg);
  }

  boost::system::error_code err{};
  boost::filesystem::create_directories(chunkmetaDir, err);

  if (UNLIKELY(err.failed())) {
    XLOGF(CRITICAL, "Failed to create directory {}, error: {}", chunkmetaDir, err.message());
    co_return makeError(StatusCode::kIOError);
  }

  XLOGF(CRITICAL, "Saving chunk metadata to directory: {}", chunkmetaDir);

  CO_RETURN_AND_LOG_ON_ERROR(co_await env.mgmtdClientGetter()->refreshRoutingInfo(/*force=*/true));
  auto routingInfo = env.mgmtdClientGetter()->getRoutingInfo();
  auto chainInfos = routingInfo->raw()->chains;

  XLOGF(CRITICAL, "Found {} replication chains in {}", chainInfos.size(), routingInfo->raw()->routingInfoVersion);

  std::map<flat::ChainId, flat::ChainInfo> sortedChainInfos;
  for (const auto &[chainId, chainInfo] : chainInfos) sortedChainInfos.emplace(chainId, chainInfo);
  std::set<uint32_t> selectedChainIds(chainIds.begin(), chainIds.end());

  size_t numChainsToSave = selectedChainIds.empty() ? sortedChainInfos.size() : selectedChainIds.size();
  std::atomic_size_t numTargetsSaved = 0;

  auto dumpChunkMeta = [env, chunkmetaDir, parquetFormat, numChainsToSave, &numTargetsSaved](
                           const size_t chainIndex,
                           const flat::ChainId chainId,
                           const flat::TargetId targetId) -> CoTask<bool> {
    XLOGF(INFO, "#{}/{} Getting chunk metadata from target: {}@{}", chainIndex, numChainsToSave, targetId, chainId);

    auto chunkMetadata = co_await env.storageClientGetter()->getAllChunkMetadata(chainId, targetId);

    if (!chunkMetadata) {
      XLOGF(ERR,
            "Failed to get chunk metadata from target: {}@{}, error: {}",
            targetId,
            chainId,
            chunkMetadata.error());
      co_return false;
    }

    time_t timestamp = UtcClock::secondsSinceEpoch();
    ChunkMetaTable metadata{chainId, targetId, timestamp};

    for (const auto &chunkmeta : *chunkMetadata) {
      auto metaChunkId = meta::ChunkId::unpack(chunkmeta.chunkId.data());
      metadata.chunks.push_back({timestamp, chainId, targetId, metaChunkId.inode(), chunkmeta});
    }

    XLOGF(INFO,
          "#{}/{} Found {} chunks on target: {}@{}",
          chainIndex,
          numChainsToSave,
          metadata.chunks.size(),
          targetId,
          chainId);

    auto filePath =
        Path(chunkmetaDir) / fmt::format("{}-{}.meta", uint32_t(metadata.chainId), uint64_t(metadata.targetId));
    bool writeOk = parquetFormat ? metadata.dumpToParquetFile(filePath.replace_extension(".parquet"))
                                 : metadata.dumpToFile(filePath);
    numTargetsSaved += writeOk;

    XLOGF_IF(WARN,
             writeOk,
             "#{}/{} Metadata of {} chunks on {}@{} saved to file: {}",
             chainIndex,
             numChainsToSave,
             metadata.chunks.size(),
             targetId,
             chainId,
             filePath);
    co_return writeOk;
  };

  auto executor = std::make_unique<folly::CPUThreadPoolExecutor>(parallel);
  std::vector<folly::coro::TaskWithExecutor<bool>> tasks;
  size_t chainIndex = 0;

  for (const auto &[chainId, chainInfo] : sortedChainInfos) {
    if (!selectedChainIds.empty() && !selectedChainIds.count(uint32_t(chainId))) continue;

    chainIndex++;

    if (chainInfo.targets.empty()) {
      XLOGF(CRITICAL, "Empty list of targets on {}: {}", chainId, chainInfo);
      co_return makeError(StorageClientCode::kRoutingError);
    }

    for (const auto &target : chainInfo.targets) {
      if (target.publicState == flat::PublicTargetState::SERVING) {
        if (parallel > 0) {
          tasks.push_back(dumpChunkMeta(chainIndex, chainId, target.targetId)
                              .scheduleOn(folly::Executor::getKeepAliveToken(*executor)));
        } else {
          bool ok = co_await dumpChunkMeta(chainIndex, chainId, target.targetId);
          if (!ok) co_return makeError(StatusCode::kIOError);
        }
      } else {
        XLOGF(WARN, "Skip target {}@{} not in serving state: {}", target.targetId, chainId, target);
      }

      if (onlyHead) break;
    }
  }

  if (parallel > 0 && !tasks.empty()) {
    auto results = co_await folly::coro::collectAllWindowed(std::move(tasks), parallel);
    if (!std::all_of(results.begin(), results.end(), [](bool ok) { return ok; })) {
      XLOGF(CRITICAL, "Some of the chunkmeta dump tasks failed");
      co_return makeError(StatusCode::kIOError);
    }
  }

  XLOGF(CRITICAL,
        "Chunk metadata on {} targets from {} chains saved to directory: {}",
        numTargetsSaved.load(),
        numChainsToSave,
        chunkmetaDir);

  co_return table;
}