Result TargetMap::updateRouting()

in src/storage/service/TargetMap.cc [124:283]


Result<Void> TargetMap::updateRouting(std::shared_ptr<hf3fs::client::RoutingInfo> r, bool log /* = true */) {
  auto recordGuard = updateRoutingRecorder.record();

  if (UNLIKELY(r == nullptr)) {
    XLOGF(ERR, "routing info is empty");
    return makeError(StorageClientCode::kRoutingError, "routing info is empty");
  }
  auto &routingInfo = r->raw();
  if (routingInfoVersion_ > routingInfo->routingInfoVersion) {
    auto msg = fmt::format("routing info expired! {} > {}", routingInfoVersion_, routingInfo->routingInfoVersion);
    XLOG(ERR, msg);
    return makeError(StorageClientCode::kRoutingError, std::move(msg));
  }
  XLOGF(INFO, "routing info updated, {} -> {}", routingInfoVersion_, routingInfo->routingInfoVersion);

  // 1. reset current state.
  routingInfoVersion_ = routingInfo->routingInfoVersion;
  chainToTarget_.clear();
  syncingChains_.clear();
  robin_hood::unordered_set<TargetId> headTargets;
  robin_hood::unordered_set<TargetId> tailTargets;
  robin_hood::unordered_set<TargetId> lastSrvTargets;
  for (auto &[targetId, target] : targets_) {
    if (target.isHead) {
      headTargets.insert(target.targetId);
    }
    if (target.isTail) {
      tailTargets.insert(target.targetId);
    }
    if (target.publicState == flat::PublicTargetState::LASTSRV) {
      lastSrvTargets.insert(target.targetId);
    }
    target.isHead = false;
    target.isTail = false;
    target.vChainId = VersionedChainId{};
    target.publicState = flat::PublicTargetState::INVALID;
    target.successor = std::nullopt;
  }
  bool invalidRoutingInfo = false;
  auto invalidRoutingInfoLogGuard = folly::makeGuard([&] {
    if (invalidRoutingInfo) {
      XLOGF(CRITICAL, "invalid routing info: {}", *routingInfo);
    }
  });

  // 2. iterate routing info.
  for (auto &[id, chain] : routingInfo->chains) {
    // 3. find target in chain.
    auto it = std::find_if(chain.targets.begin(), chain.targets.end(), [&](const flat::ChainTargetInfo &targetInfo) {
      return bool(getMutableTarget(targetInfo.targetId));
    });
    if (it == chain.targets.end()) {
      continue;
    }

    // 4. find target info.
    auto targetId = it->targetId;
    auto targetInfo = routingInfo->getTarget(targetId);
    if (UNLIKELY(!targetInfo)) {
      auto msg = fmt::format("targetInfo id {} not found", targetId);
      XLOG(ERR, msg);
      return makeError(StorageClientCode::kRoutingError, std::move(msg));
    }

    // 5. update local target.
    CHECK_RESULT(target, getMutableTarget(targetId));
    bool targetIsServing = targetInfo->publicState == flat::PublicTargetState::SERVING ||
                           targetInfo->publicState == flat::PublicTargetState::SYNCING;
    auto previousLocalState = target->localState;
    target->isHead = (targetIsServing && it == chain.targets.begin());
    target->vChainId = VersionedChainId{chain.chainId, chain.chainVersion};
    if (target->storageTarget != nullptr) {
      if (target->storageTarget->chainId() == ChainId{}) {
        RETURN_AND_LOG_ON_ERROR(target->storageTarget->setChainId(chain.chainId));
      }
      if (target->storageTarget->chainId() != chain.chainId) {
        auto msg = fmt::format("target.chain != routing.chain, target {}, chain {}", *target, chain);
        XLOG(ERR, msg);
        return makeError(StorageClientCode::kRoutingError, std::move(msg));
      }
    }
    target->localState = updateLocalState(targetId, previousLocalState, targetInfo->publicState);
    target->publicState = targetInfo->publicState;
    auto [chainToTargetIt, succ] = chainToTarget_.emplace(chain.chainId, targetId);
    if (!succ) {
      auto msg = fmt::format("chain {} map to 2 targets {}, {}", chain.chainId, chainToTargetIt->second, targetId);
      XLOG(ERR, msg);
      return makeError(StorageClientCode::kRoutingError, std::move(msg));
    }

    if (previousLocalState != flat::LocalTargetState::OFFLINE &&
        target->localState == flat::LocalTargetState::OFFLINE) {
      target->weakStorageTarget = target->storageTarget->aliveWeakPtr();
      target->storageTarget = nullptr;
      continue;
    }

    // 6. update successor.
    while (targetIsServing && ++it != chain.targets.end()) {
      auto targetInfo = routingInfo->getTarget(it->targetId);
      if (UNLIKELY(!targetInfo)) {
        auto msg = fmt::format("successor {} not found", it->targetId);
        XLOG(ERR, msg);
        return makeError(StorageClientCode::kRoutingError, std::move(msg));
      }
      if (targetInfo->publicState == flat::PublicTargetState::SERVING) {
        target->successor = Successor{{}, *targetInfo};
      } else if (targetInfo->publicState == flat::PublicTargetState::SYNCING) {
        target->successor = Successor{{}, *targetInfo};
        syncingChains_.push_back(VersionedChainId{chain.chainId, chain.chainVersion});
      }

      if (target->successor) {
        if (!targetInfo->nodeId.has_value()) {
          XLOGF(WARNING, "target {} node id is nullopt", it->targetId);
          break;
        }
        auto node = routingInfo->getNode(*targetInfo->nodeId);
        if (!node) {
          XLOGF(WARNING, "node {} not found", targetInfo->nodeId);
          break;
        }
        target->successor->nodeInfo = *node;
        if (UNLIKELY(target->successor->nodeInfo.app.serviceGroups.empty())) {
          XLOGF(CRITICAL, "successor invalid! chain {}, successor {}, node {}", chain.chainId, *targetInfo, *node);
          invalidRoutingInfo = true;
        }
      }
      break;
    }
    target->isTail = (targetIsServing && !target->successor.has_value());

    if (headTargets.contains(targetId) ^ target->isHead) {
      if (target->isHead) {
        XLOGF_IF(WARNING, log, "target {} becomes head", targetId);
      } else {
        XLOGF_IF(WARNING, log, "target {} is no longer head", targetId);
      }
    }
    if (tailTargets.contains(targetId) ^ target->isTail) {
      if (target->isTail) {
        XLOGF_IF(WARNING, log, "target {} becomes tail", targetId);
      } else {
        XLOGF_IF(WARNING, log, "target {} is no longer tail", targetId);
      }
    }
  }

  for (auto &[targetId, target] : targets_) {
    if (lastSrvTargets.contains(targetId) && target.storageTarget &&
        (target.publicState == flat::PublicTargetState::SERVING ||
         target.publicState == flat::PublicTargetState::SYNCING ||
         target.publicState == flat::PublicTargetState::WAITING)) {
      target.storageTarget->resetUncommitted(target.vChainId.chainVer);
    }
  }

  recordGuard.succ();
  return Void{};
}