void update()

in src/broker.h [123:230]


  void update() {

    auto now = std::chrono::steady_clock::now();

    if (!syncSet.empty()) {

      for (auto i = syncSet.begin(); i != syncSet.end();) {
        auto& g = **i;
        std::lock_guard l(g.mutex);
        size_t total = 0;
        size_t ready = 0;
        for ([[maybe_unused]] auto& [pname, p] : g.peers) {
          if (p.syncFuture) {
            ++total;
            if (*p.syncFuture) {
              if ((*p.syncFuture)->first == g.syncId) {
                ++ready;
              } else {
                log.info("bad sync id?? got %#x expected %#x", (*p.syncFuture)->first, g.syncId);
                --total;
              }
            }
          }
        }
        // log("Sync midway %s %d/%d in %gs\n", g.name, ready, total, seconds(now - g.lastUpdate));
        if (ready >= total || now - g.lastUpdate >= std::chrono::seconds(1)) {
          log.info("Sync %s %d/%d in %gs\n", g.name, ready, total, seconds(now - g.lastUpdate));

          tmpPeers.clear();
          for ([[maybe_unused]] auto& [pname, p] : g.peers) {
            if (p.syncFuture && *p.syncFuture && (*p.syncFuture)->first == g.syncId) {
              p.sortOrder = (*p.syncFuture)->second;
              tmpPeers.push_back(&p);
              p.active = true;
            } else {
              p.active = false;
            }
          }
          std::sort(tmpPeers.begin(), tmpPeers.end(), [](Peer* a, Peer* b) {
            if (a->sortOrder == b->sortOrder) {
              return a->creationOrder < b->creationOrder;
            }
            return a->sortOrder < b->sortOrder;
          });
          g.active.clear();
          for (auto* p : tmpPeers) {
            log.info("%s with sort order %d\n", p->name, p->sortOrder);
            g.active.push_back(p->name);
          }
          if (!g.active.empty()) {
            log.info("%s is the master\n", g.active.front());
          }
          for (auto* p : tmpPeers) {
            p->updateFuture = call<void>(p->name, "GroupService::update", g.name, g.syncId, g.active);
          }

          i = syncSet.erase(i);
        } else {
          ++i;
        }
      }
    }

    if (now - lastCheckTimeouts < std::chrono::milliseconds(500)) {
      return;
    }

    lastCheckTimeouts = now;
    tmpGroups.clear();
    {
      std::lock_guard l(groupsMutex);
      for (auto& [gname, g] : groups) {
        tmpGroups.push_back(&g);
      }
    }
    for (auto* pg : tmpGroups) {
      auto& g = *pg;
      std::lock_guard l2(g.mutex);
      for (auto i = g.peers.begin(); i != g.peers.end();) {
        auto& p = i->second;
        if (now - p.lastPing >= p.timeoutDuration) {
          log.info("Peer %s::%s timed out\n", g.name, p.name);
          if (p.active) {
            g.needsUpdate = true;
          }
          i = g.peers.erase(i);
        } else {
          ++i;
        }
      }
      auto mintime = std::chrono::seconds(2);
      if (g.needsUpdate && (now - g.lastUpdate >= mintime)) {
        log.info("Initiating update of group %s\n", g.name);
        ++g.updateCount;
        g.lastUpdate = now;
        g.needsUpdate = false;
        uint32_t syncId = nextSyncId++;
        if (syncId == 0) {
          syncId = nextSyncId++;
        }
        g.syncId = syncId;
        for ([[maybe_unused]] auto& [pname, p] : g.peers) {
          p.syncFuture = call<std::pair<uint32_t, int32_t>>(pname, "GroupService::sync", g.name, syncId);
        }
        syncSet.insert(&g);
      }
    }
  }