in src/storage/service/TargetMap.cc [124:283]
Result<Void> TargetMap::updateRouting(std::shared_ptr<hf3fs::client::RoutingInfo> r, bool log /* = true */) {
auto recordGuard = updateRoutingRecorder.record();
if (UNLIKELY(r == nullptr)) {
XLOGF(ERR, "routing info is empty");
return makeError(StorageClientCode::kRoutingError, "routing info is empty");
}
auto &routingInfo = r->raw();
if (routingInfoVersion_ > routingInfo->routingInfoVersion) {
auto msg = fmt::format("routing info expired! {} > {}", routingInfoVersion_, routingInfo->routingInfoVersion);
XLOG(ERR, msg);
return makeError(StorageClientCode::kRoutingError, std::move(msg));
}
XLOGF(INFO, "routing info updated, {} -> {}", routingInfoVersion_, routingInfo->routingInfoVersion);
// 1. reset current state.
routingInfoVersion_ = routingInfo->routingInfoVersion;
chainToTarget_.clear();
syncingChains_.clear();
robin_hood::unordered_set<TargetId> headTargets;
robin_hood::unordered_set<TargetId> tailTargets;
robin_hood::unordered_set<TargetId> lastSrvTargets;
for (auto &[targetId, target] : targets_) {
if (target.isHead) {
headTargets.insert(target.targetId);
}
if (target.isTail) {
tailTargets.insert(target.targetId);
}
if (target.publicState == flat::PublicTargetState::LASTSRV) {
lastSrvTargets.insert(target.targetId);
}
target.isHead = false;
target.isTail = false;
target.vChainId = VersionedChainId{};
target.publicState = flat::PublicTargetState::INVALID;
target.successor = std::nullopt;
}
bool invalidRoutingInfo = false;
auto invalidRoutingInfoLogGuard = folly::makeGuard([&] {
if (invalidRoutingInfo) {
XLOGF(CRITICAL, "invalid routing info: {}", *routingInfo);
}
});
// 2. iterate routing info.
for (auto &[id, chain] : routingInfo->chains) {
// 3. find target in chain.
auto it = std::find_if(chain.targets.begin(), chain.targets.end(), [&](const flat::ChainTargetInfo &targetInfo) {
return bool(getMutableTarget(targetInfo.targetId));
});
if (it == chain.targets.end()) {
continue;
}
// 4. find target info.
auto targetId = it->targetId;
auto targetInfo = routingInfo->getTarget(targetId);
if (UNLIKELY(!targetInfo)) {
auto msg = fmt::format("targetInfo id {} not found", targetId);
XLOG(ERR, msg);
return makeError(StorageClientCode::kRoutingError, std::move(msg));
}
// 5. update local target.
CHECK_RESULT(target, getMutableTarget(targetId));
bool targetIsServing = targetInfo->publicState == flat::PublicTargetState::SERVING ||
targetInfo->publicState == flat::PublicTargetState::SYNCING;
auto previousLocalState = target->localState;
target->isHead = (targetIsServing && it == chain.targets.begin());
target->vChainId = VersionedChainId{chain.chainId, chain.chainVersion};
if (target->storageTarget != nullptr) {
if (target->storageTarget->chainId() == ChainId{}) {
RETURN_AND_LOG_ON_ERROR(target->storageTarget->setChainId(chain.chainId));
}
if (target->storageTarget->chainId() != chain.chainId) {
auto msg = fmt::format("target.chain != routing.chain, target {}, chain {}", *target, chain);
XLOG(ERR, msg);
return makeError(StorageClientCode::kRoutingError, std::move(msg));
}
}
target->localState = updateLocalState(targetId, previousLocalState, targetInfo->publicState);
target->publicState = targetInfo->publicState;
auto [chainToTargetIt, succ] = chainToTarget_.emplace(chain.chainId, targetId);
if (!succ) {
auto msg = fmt::format("chain {} map to 2 targets {}, {}", chain.chainId, chainToTargetIt->second, targetId);
XLOG(ERR, msg);
return makeError(StorageClientCode::kRoutingError, std::move(msg));
}
if (previousLocalState != flat::LocalTargetState::OFFLINE &&
target->localState == flat::LocalTargetState::OFFLINE) {
target->weakStorageTarget = target->storageTarget->aliveWeakPtr();
target->storageTarget = nullptr;
continue;
}
// 6. update successor.
while (targetIsServing && ++it != chain.targets.end()) {
auto targetInfo = routingInfo->getTarget(it->targetId);
if (UNLIKELY(!targetInfo)) {
auto msg = fmt::format("successor {} not found", it->targetId);
XLOG(ERR, msg);
return makeError(StorageClientCode::kRoutingError, std::move(msg));
}
if (targetInfo->publicState == flat::PublicTargetState::SERVING) {
target->successor = Successor{{}, *targetInfo};
} else if (targetInfo->publicState == flat::PublicTargetState::SYNCING) {
target->successor = Successor{{}, *targetInfo};
syncingChains_.push_back(VersionedChainId{chain.chainId, chain.chainVersion});
}
if (target->successor) {
if (!targetInfo->nodeId.has_value()) {
XLOGF(WARNING, "target {} node id is nullopt", it->targetId);
break;
}
auto node = routingInfo->getNode(*targetInfo->nodeId);
if (!node) {
XLOGF(WARNING, "node {} not found", targetInfo->nodeId);
break;
}
target->successor->nodeInfo = *node;
if (UNLIKELY(target->successor->nodeInfo.app.serviceGroups.empty())) {
XLOGF(CRITICAL, "successor invalid! chain {}, successor {}, node {}", chain.chainId, *targetInfo, *node);
invalidRoutingInfo = true;
}
}
break;
}
target->isTail = (targetIsServing && !target->successor.has_value());
if (headTargets.contains(targetId) ^ target->isHead) {
if (target->isHead) {
XLOGF_IF(WARNING, log, "target {} becomes head", targetId);
} else {
XLOGF_IF(WARNING, log, "target {} is no longer head", targetId);
}
}
if (tailTargets.contains(targetId) ^ target->isTail) {
if (target->isTail) {
XLOGF_IF(WARNING, log, "target {} becomes tail", targetId);
} else {
XLOGF_IF(WARNING, log, "target {} is no longer tail", targetId);
}
}
}
for (auto &[targetId, target] : targets_) {
if (lastSrvTargets.contains(targetId) && target.storageTarget &&
(target.publicState == flat::PublicTargetState::SERVING ||
target.publicState == flat::PublicTargetState::SYNCING ||
target.publicState == flat::PublicTargetState::WAITING)) {
target.storageTarget->resetUncommitted(target.vChainId.chainVer);
}
}
recordGuard.succ();
return Void{};
}