CoTryTask Distributor::update()

in src/meta/components/Distributor.cc [218:319]


CoTryTask<bool> Distributor::update(kv::IReadWriteTransaction &txn, bool exit) {
  XLOGF(INFO, "{} update, exit {}", nodeId_, exit);
  auto current = co_await loadServerMap(txn, true);
  CO_RETURN_ON_ERROR(current);
  auto startCheck = SteadyClock::now();

  {
    auto rlock = latest_.rlock();
    XLOGF_IF(FATAL,
             current->versionstamp > rlock->versionstamp,
             "{} > {}",
             FMT_KEY(current->versionstamp),
             FMT_KEY(rlock->versionstamp));
    if (current->versionstamp < rlock->versionstamp) {
      XLOGF(WARN, "version {} < {}, need retry", FMT_KEY(current->versionstamp), FMT_KEY(rlock->versionstamp));
      co_return makeError(TransactionCode::kTooOld, "distributor versionstamp changed");
    } else {
      XLOGF_IF(DFATAL,
               current->active != rlock->active,
               "versionstamp {}, {} != {}",
               FMT_KEY(current->versionstamp),
               fmt::join(current->active.begin(), current->active.end(), ","),
               fmt::join(rlock->active.begin(), rlock->active.end(), ","));
    }
  }

  auto opts = kv::TransactionHelper::ListByPrefixOptions().withSnapshot(true).withInclusive(false).withLimit(0);
  auto result = co_await kv::TransactionHelper::listByPrefix(txn, fmt::format("{}-", kPrefix), opts);
  CO_RETURN_ON_ERROR(result);

  std::set<flat::NodeId> dead;
  servers_.withWLock([&](auto &servers) {
    bool self = false;
    for (auto &[key, versionstamp] : *result) {
      auto nodeId = PerServerKey::unpack(key);
      if (!nodeId) {
        XLOGF(DFATAL, "Failed to unpack key {}", key);
        continue;
      } else if (nodeId == nodeId_) {
        self = true;
        continue;
      }
      if (!servers.contains(nodeId) || servers[nodeId].versionstamp != versionstamp) {
        XLOGF(INFO,
              "{} found {} alive, prev {}, curr {}",
              nodeId_,
              nodeId,
              FMT_KEY(servers[nodeId].versionstamp),
              FMT_KEY(versionstamp));
        servers[nodeId] = {versionstamp, SteadyClock::now()};
      }
    }
    XLOGF_IF(DFATAL, (updated_ != 0 && !self), "self {} not found!!!", nodeId_);

    auto timeout = config_.timeout();
    for (auto nodeId : current->active) {
      auto state = servers[nodeId];
      if (nodeId != nodeId_ && state.lastUpdate + timeout < startCheck) {
        XLOGF(CRITICAL, "{} mark {} as dead, not update in {}", nodeId_, nodeId, timeout);
        dead.emplace(nodeId);
      }
    }
  });

  auto key = PerServerKey::pack(nodeId_);
  std::array<char, sizeof(kv::Versionstamp)> buf{0};
  CO_RETURN_ON_ERROR(co_await txn.setVersionstampedValue(key, {buf.data(), buf.size()}, 0));

  bool update = false;
  if (!exit && std::find(current->active.begin(), current->active.end(), nodeId_) == current->active.end()) {
    XLOGF(INFO, "{} not in server map, create a new map", nodeId_);
    update = true;
  }
  if (!dead.empty()) {
    XLOGF(INFO, "{} found dead servers {}, create a new map", nodeId_, fmt::join(dead.begin(), dead.end(), ","));
    update = true;
  }
  if (exit) {
    XLOGF(INFO, "{} exiting, create a new map", nodeId_);
    dead.insert(nodeId_);
    update = true;
  }

  if (!update) {
    co_return false;
  }

  std::set<flat::NodeId> active;
  if (current) {
    for (auto node : current->active) {
      if (!dead.contains(node)) {
        active.insert(node);
      }
    }
  }
  if (!exit) {
    active.insert(nodeId_);
  }
  ServerMap map{std::vector<flat::NodeId>(active.begin(), active.end())};
  CO_RETURN_ON_ERROR(co_await updateServerMap(txn, map));
  co_return true;
}