in src/broker.h [123:230]
void update() {
auto now = std::chrono::steady_clock::now();
if (!syncSet.empty()) {
for (auto i = syncSet.begin(); i != syncSet.end();) {
auto& g = **i;
std::lock_guard l(g.mutex);
size_t total = 0;
size_t ready = 0;
for ([[maybe_unused]] auto& [pname, p] : g.peers) {
if (p.syncFuture) {
++total;
if (*p.syncFuture) {
if ((*p.syncFuture)->first == g.syncId) {
++ready;
} else {
log.info("bad sync id?? got %#x expected %#x", (*p.syncFuture)->first, g.syncId);
--total;
}
}
}
}
// log("Sync midway %s %d/%d in %gs\n", g.name, ready, total, seconds(now - g.lastUpdate));
if (ready >= total || now - g.lastUpdate >= std::chrono::seconds(1)) {
log.info("Sync %s %d/%d in %gs\n", g.name, ready, total, seconds(now - g.lastUpdate));
tmpPeers.clear();
for ([[maybe_unused]] auto& [pname, p] : g.peers) {
if (p.syncFuture && *p.syncFuture && (*p.syncFuture)->first == g.syncId) {
p.sortOrder = (*p.syncFuture)->second;
tmpPeers.push_back(&p);
p.active = true;
} else {
p.active = false;
}
}
std::sort(tmpPeers.begin(), tmpPeers.end(), [](Peer* a, Peer* b) {
if (a->sortOrder == b->sortOrder) {
return a->creationOrder < b->creationOrder;
}
return a->sortOrder < b->sortOrder;
});
g.active.clear();
for (auto* p : tmpPeers) {
log.info("%s with sort order %d\n", p->name, p->sortOrder);
g.active.push_back(p->name);
}
if (!g.active.empty()) {
log.info("%s is the master\n", g.active.front());
}
for (auto* p : tmpPeers) {
p->updateFuture = call<void>(p->name, "GroupService::update", g.name, g.syncId, g.active);
}
i = syncSet.erase(i);
} else {
++i;
}
}
}
if (now - lastCheckTimeouts < std::chrono::milliseconds(500)) {
return;
}
lastCheckTimeouts = now;
tmpGroups.clear();
{
std::lock_guard l(groupsMutex);
for (auto& [gname, g] : groups) {
tmpGroups.push_back(&g);
}
}
for (auto* pg : tmpGroups) {
auto& g = *pg;
std::lock_guard l2(g.mutex);
for (auto i = g.peers.begin(); i != g.peers.end();) {
auto& p = i->second;
if (now - p.lastPing >= p.timeoutDuration) {
log.info("Peer %s::%s timed out\n", g.name, p.name);
if (p.active) {
g.needsUpdate = true;
}
i = g.peers.erase(i);
} else {
++i;
}
}
auto mintime = std::chrono::seconds(2);
if (g.needsUpdate && (now - g.lastUpdate >= mintime)) {
log.info("Initiating update of group %s\n", g.name);
++g.updateCount;
g.lastUpdate = now;
g.needsUpdate = false;
uint32_t syncId = nextSyncId++;
if (syncId == 0) {
syncId = nextSyncId++;
}
g.syncId = syncId;
for ([[maybe_unused]] auto& [pname, p] : g.peers) {
p.syncFuture = call<std::pair<uint32_t, int32_t>>(pname, "GroupService::sync", g.name, syncId);
}
syncSet.insert(&g);
}
}
}