in src/meta/components/Distributor.cc [218:319]
CoTryTask<bool> Distributor::update(kv::IReadWriteTransaction &txn, bool exit) {
XLOGF(INFO, "{} update, exit {}", nodeId_, exit);
auto current = co_await loadServerMap(txn, true);
CO_RETURN_ON_ERROR(current);
auto startCheck = SteadyClock::now();
{
auto rlock = latest_.rlock();
XLOGF_IF(FATAL,
current->versionstamp > rlock->versionstamp,
"{} > {}",
FMT_KEY(current->versionstamp),
FMT_KEY(rlock->versionstamp));
if (current->versionstamp < rlock->versionstamp) {
XLOGF(WARN, "version {} < {}, need retry", FMT_KEY(current->versionstamp), FMT_KEY(rlock->versionstamp));
co_return makeError(TransactionCode::kTooOld, "distributor versionstamp changed");
} else {
XLOGF_IF(DFATAL,
current->active != rlock->active,
"versionstamp {}, {} != {}",
FMT_KEY(current->versionstamp),
fmt::join(current->active.begin(), current->active.end(), ","),
fmt::join(rlock->active.begin(), rlock->active.end(), ","));
}
}
auto opts = kv::TransactionHelper::ListByPrefixOptions().withSnapshot(true).withInclusive(false).withLimit(0);
auto result = co_await kv::TransactionHelper::listByPrefix(txn, fmt::format("{}-", kPrefix), opts);
CO_RETURN_ON_ERROR(result);
std::set<flat::NodeId> dead;
servers_.withWLock([&](auto &servers) {
bool self = false;
for (auto &[key, versionstamp] : *result) {
auto nodeId = PerServerKey::unpack(key);
if (!nodeId) {
XLOGF(DFATAL, "Failed to unpack key {}", key);
continue;
} else if (nodeId == nodeId_) {
self = true;
continue;
}
if (!servers.contains(nodeId) || servers[nodeId].versionstamp != versionstamp) {
XLOGF(INFO,
"{} found {} alive, prev {}, curr {}",
nodeId_,
nodeId,
FMT_KEY(servers[nodeId].versionstamp),
FMT_KEY(versionstamp));
servers[nodeId] = {versionstamp, SteadyClock::now()};
}
}
XLOGF_IF(DFATAL, (updated_ != 0 && !self), "self {} not found!!!", nodeId_);
auto timeout = config_.timeout();
for (auto nodeId : current->active) {
auto state = servers[nodeId];
if (nodeId != nodeId_ && state.lastUpdate + timeout < startCheck) {
XLOGF(CRITICAL, "{} mark {} as dead, not update in {}", nodeId_, nodeId, timeout);
dead.emplace(nodeId);
}
}
});
auto key = PerServerKey::pack(nodeId_);
std::array<char, sizeof(kv::Versionstamp)> buf{0};
CO_RETURN_ON_ERROR(co_await txn.setVersionstampedValue(key, {buf.data(), buf.size()}, 0));
bool update = false;
if (!exit && std::find(current->active.begin(), current->active.end(), nodeId_) == current->active.end()) {
XLOGF(INFO, "{} not in server map, create a new map", nodeId_);
update = true;
}
if (!dead.empty()) {
XLOGF(INFO, "{} found dead servers {}, create a new map", nodeId_, fmt::join(dead.begin(), dead.end(), ","));
update = true;
}
if (exit) {
XLOGF(INFO, "{} exiting, create a new map", nodeId_);
dead.insert(nodeId_);
update = true;
}
if (!update) {
co_return false;
}
std::set<flat::NodeId> active;
if (current) {
for (auto node : current->active) {
if (!dead.contains(node)) {
active.insert(node);
}
}
}
if (!exit) {
active.insert(nodeId_);
}
ServerMap map{std::vector<flat::NodeId>(active.begin(), active.end())};
CO_RETURN_ON_ERROR(co_await updateServerMap(txn, map));
co_return true;
}