in src/client/storage/StorageClientImpl.cc [490:631]
std::vector<Op *> selectRoutingTargetForOps(ClientRequestContext &requestCtx,
const std::shared_ptr<hf3fs::client::RoutingInfo const> &routingInfo,
const TargetSelectionOptions &options,
const std::vector<Op *> &ops) {
if (routingInfo == nullptr) {
XLOGF(ERR, "Unexpected: routing info is nullptr");
setErrorCodeOfOps(ops, StorageClientCode::kRoutingError);
return {};
}
XLOGF_IF(INFO,
requestCtx.retryCount > 0,
"Start to select routing targets for {} {} ops {} based on {}: "
"bootstrapping? {}, {} nodes, {} chain tables, {} chains, {} targets",
ops.size(),
magic_enum::enum_name(requestCtx.methodType),
fmt::ptr(&ops),
routingInfo->raw()->routingInfoVersion,
routingInfo->raw()->bootstrapping,
routingInfo->raw()->nodes.size(),
routingInfo->raw()->chainTables.size(),
routingInfo->raw()->chains.size(),
routingInfo->raw()->targets.size());
std::unordered_map<ChainId, SlimChainInfo> slimChains(ops.size());
std::unordered_map<ChainId, hf3fs::flat::ChainInfo> chainInfos(ops.size());
auto targetSelectionStrategy = TargetSelectionStrategy::create(options);
auto selectNodeInTrafficZone = flat::selectNodeByTrafficZone(options.trafficZone());
std::vector<Op *> targetedOps;
targetedOps.reserve(ops.size());
for (auto op : ops) {
auto chainId = op->routingTarget.chainId;
if (slimChains.count(chainId) == 0) {
auto chainInfo = getChainInfo(routingInfo, hf3fs::flat::ChainId(chainId));
if (!chainInfo) {
setErrorCodeOfOp(op, chainInfo.error().code());
continue;
}
chainInfos.emplace(chainId, *chainInfo);
auto servingTargets = selectServingTargets(routingInfo, *chainInfo, selectNodeInTrafficZone);
if (!servingTargets) {
setErrorCodeOfOp(op, servingTargets.error().code());
continue;
}
if (servingTargets->empty()) {
XLOGF(DBG3,
"All targets on the chain not serving or not in traffic zone ({}): {}",
options.trafficZone(),
*chainInfo);
} else if (targetSelectionStrategy->selectAnyTarget()) {
std::vector<SlimTargetInfo> reachableTargets;
reachableTargets.reserve(servingTargets->size());
for (const auto &[targetId, nodeId] : *servingTargets) {
auto targetOnChain = std::make_tuple(targetId, chainInfo->chainId, chainInfo->chainVersion);
auto iter = requestCtx.numFailures.find(targetOnChain);
if (iter != requestCtx.numFailures.end() &&
iter->second >= requestCtx.clientConfig.retry().max_failures_before_failover()) {
XLOGF(DBG5,
"Target {} on {}@{} should be skipped if possible since its host {} cannot be reached",
targetId,
chainInfo->chainId,
chainInfo->chainVersion,
nodeId);
} else {
reachableTargets.push_back({targetId, nodeId});
}
}
if (reachableTargets.empty()) {
XLOGF(DBG3, "All serving targets on the chain not reachable: {}", *chainInfo);
} else {
XLOGF_IF(DBG5,
reachableTargets.size() < servingTargets->size(),
"Found {}/{} reachable targets on the chain: {}",
reachableTargets.size(),
servingTargets->size(),
*chainInfo);
servingTargets->swap(reachableTargets);
}
}
SlimChainInfo &slimChain = slimChains[chainId];
slimChain.chainId = chainId;
slimChain.version = chainInfo->chainVersion;
slimChain.routingInfoVer = routingInfo->raw()->routingInfoVersion;
slimChain.totalNumTargets = chainInfo->targets.size();
slimChain.servingTargets.swap(*servingTargets);
if (slimChain.servingTargets.size() < chainInfo->targets.size()) {
targetSelectionStrategy->reset();
}
}
const SlimChainInfo &slimChain = slimChains[chainId];
op->routingTarget.chainVer = slimChain.version;
op->routingTarget.routingInfoVer = slimChain.routingInfoVer;
if (slimChain.servingTargets.empty()) {
XLOGF(WARN,
"All targets on chain {}@{} not serving/reachable or not in traffic zone ({}): {}",
slimChain.chainId,
slimChain.version,
options.trafficZone(),
chainInfos[chainId]);
setErrorCodeOfOp(op, StorageClientCode::kNotAvailable);
continue;
}
auto selectedTarget = targetSelectionStrategy->selectTarget(slimChain);
if (!selectedTarget) {
XLOGF(WARN,
"Unable to select a routing target from {}/{} serving targets on {}@{}, selection mode {}: {}",
slimChain.servingTargets.size(),
slimChain.totalNumTargets,
slimChain.chainId,
slimChain.version,
toStringView(options.mode()),
chainInfos[chainId]);
setErrorCodeOfOp(op, selectedTarget.error().code());
continue;
}
op->routingTarget.targetInfo = *selectedTarget;
XLOGF(DBG7, "Selected routing target for operation {}: {}", fmt::ptr(op), op->routingTarget);
targetedOps.push_back(op);
}
return targetedOps;
}