std::vector selectRoutingTargetForOps()

in src/client/storage/StorageClientImpl.cc [490:631]


std::vector<Op *> selectRoutingTargetForOps(ClientRequestContext &requestCtx,
                                            const std::shared_ptr<hf3fs::client::RoutingInfo const> &routingInfo,
                                            const TargetSelectionOptions &options,
                                            const std::vector<Op *> &ops) {
  if (routingInfo == nullptr) {
    XLOGF(ERR, "Unexpected: routing info is nullptr");
    setErrorCodeOfOps(ops, StorageClientCode::kRoutingError);
    return {};
  }

  XLOGF_IF(INFO,
           requestCtx.retryCount > 0,
           "Start to select routing targets for {} {} ops {} based on {}: "
           "bootstrapping? {}, {} nodes, {} chain tables, {} chains, {} targets",
           ops.size(),
           magic_enum::enum_name(requestCtx.methodType),
           fmt::ptr(&ops),
           routingInfo->raw()->routingInfoVersion,
           routingInfo->raw()->bootstrapping,
           routingInfo->raw()->nodes.size(),
           routingInfo->raw()->chainTables.size(),
           routingInfo->raw()->chains.size(),
           routingInfo->raw()->targets.size());

  std::unordered_map<ChainId, SlimChainInfo> slimChains(ops.size());
  std::unordered_map<ChainId, hf3fs::flat::ChainInfo> chainInfos(ops.size());
  auto targetSelectionStrategy = TargetSelectionStrategy::create(options);
  auto selectNodeInTrafficZone = flat::selectNodeByTrafficZone(options.trafficZone());

  std::vector<Op *> targetedOps;
  targetedOps.reserve(ops.size());

  for (auto op : ops) {
    auto chainId = op->routingTarget.chainId;

    if (slimChains.count(chainId) == 0) {
      auto chainInfo = getChainInfo(routingInfo, hf3fs::flat::ChainId(chainId));

      if (!chainInfo) {
        setErrorCodeOfOp(op, chainInfo.error().code());
        continue;
      }

      chainInfos.emplace(chainId, *chainInfo);

      auto servingTargets = selectServingTargets(routingInfo, *chainInfo, selectNodeInTrafficZone);

      if (!servingTargets) {
        setErrorCodeOfOp(op, servingTargets.error().code());
        continue;
      }

      if (servingTargets->empty()) {
        XLOGF(DBG3,
              "All targets on the chain not serving or not in traffic zone ({}): {}",
              options.trafficZone(),
              *chainInfo);
      } else if (targetSelectionStrategy->selectAnyTarget()) {
        std::vector<SlimTargetInfo> reachableTargets;
        reachableTargets.reserve(servingTargets->size());

        for (const auto &[targetId, nodeId] : *servingTargets) {
          auto targetOnChain = std::make_tuple(targetId, chainInfo->chainId, chainInfo->chainVersion);
          auto iter = requestCtx.numFailures.find(targetOnChain);
          if (iter != requestCtx.numFailures.end() &&
              iter->second >= requestCtx.clientConfig.retry().max_failures_before_failover()) {
            XLOGF(DBG5,
                  "Target {} on {}@{} should be skipped if possible since its host {} cannot be reached",
                  targetId,
                  chainInfo->chainId,
                  chainInfo->chainVersion,
                  nodeId);
          } else {
            reachableTargets.push_back({targetId, nodeId});
          }
        }

        if (reachableTargets.empty()) {
          XLOGF(DBG3, "All serving targets on the chain not reachable: {}", *chainInfo);
        } else {
          XLOGF_IF(DBG5,
                   reachableTargets.size() < servingTargets->size(),
                   "Found {}/{} reachable targets on the chain: {}",
                   reachableTargets.size(),
                   servingTargets->size(),
                   *chainInfo);
          servingTargets->swap(reachableTargets);
        }
      }

      SlimChainInfo &slimChain = slimChains[chainId];
      slimChain.chainId = chainId;
      slimChain.version = chainInfo->chainVersion;
      slimChain.routingInfoVer = routingInfo->raw()->routingInfoVersion;
      slimChain.totalNumTargets = chainInfo->targets.size();
      slimChain.servingTargets.swap(*servingTargets);

      if (slimChain.servingTargets.size() < chainInfo->targets.size()) {
        targetSelectionStrategy->reset();
      }
    }

    const SlimChainInfo &slimChain = slimChains[chainId];

    op->routingTarget.chainVer = slimChain.version;
    op->routingTarget.routingInfoVer = slimChain.routingInfoVer;

    if (slimChain.servingTargets.empty()) {
      XLOGF(WARN,
            "All targets on chain {}@{} not serving/reachable or not in traffic zone ({}): {}",
            slimChain.chainId,
            slimChain.version,
            options.trafficZone(),
            chainInfos[chainId]);
      setErrorCodeOfOp(op, StorageClientCode::kNotAvailable);
      continue;
    }

    auto selectedTarget = targetSelectionStrategy->selectTarget(slimChain);

    if (!selectedTarget) {
      XLOGF(WARN,
            "Unable to select a routing target from {}/{} serving targets on {}@{}, selection mode {}: {}",
            slimChain.servingTargets.size(),
            slimChain.totalNumTargets,
            slimChain.chainId,
            slimChain.version,
            toStringView(options.mode()),
            chainInfos[chainId]);
      setErrorCodeOfOp(op, selectedTarget.error().code());
      continue;
    }

    op->routingTarget.targetInfo = *selectedTarget;

    XLOGF(DBG7, "Selected routing target for operation {}: {}", fmt::ptr(op), op->routingTarget);

    targetedOps.push_back(op);
  }

  return targetedOps;
}