bool update()

in src/group.h [380:476]


  bool update(GroupInfo& group, uint32_t sortOrder, uint32_t timeoutMilliseconds) {
    auto now = std::chrono::steady_clock::now();

    auto pingNow = [&]() {
      group.lastPing = now;
      group.hasPinged = true;
      group.pingFuture = callImpl<uint32_t>(
          *rpc, group.brokerName, "BrokerService::ping", group.name, rpc->getName(), timeoutMilliseconds);
    };

    std::lock_guard l(group.mutex);
    group.sortOrder = sortOrder;

    bool updated = group.isResyncing && group.haveUpdate;

    if (updated) {
      group.wantsResync = false;
      group.isResyncing = false;
      group.haveUpdate = false;
      group.syncId = group.newSyncId;
      group.members = std::move(group.newMembers);

      pingNow();
    } else if (group.wantsResync && !group.isResyncing) {
      if (group.resyncError) {
        log.verbose("Error during resync, trying again");
        group.resyncError = false;
        resync(group);
      }
    }

    if (!group.hasPinged || updated ||
        now - group.lastPing >=
            std::min(std::chrono::milliseconds(4000), std::chrono::milliseconds(timeoutMilliseconds) / 2)) {
      if (group.hasPinged && !group.pingFuture) {
        if (now - group.lastPingResponse >= std::chrono::seconds(8)) {
          log.verbose("Broker has not responded in %g seconds!\n", seconds(now - group.lastPingResponse));
        }
        if (group.brokerConnectionIsActive &&
            now - group.lastPingResponse >= std::chrono::seconds(1) + std::chrono::milliseconds(timeoutMilliseconds)) {
          group.brokerConnectionIsActive = false;
          group.members.clear();
          group.syncId = 0;
          resync(group);
          updated = true;
        }
      } else {
        group.brokerConnectionIsActive = true;
        group.lastPingResponse = now;
      }
      if (group.pingFuture && *group.pingFuture != group.syncId) {
        group.members.clear();
        group.syncId = 0;
        resync(group);
        updated = true;
      }
      pingNow();
    }

    if (updated) {
      for (auto& wh : group.activeAllReductions) {
        auto h = wh.lock();
        if (h && h->flags.load(std::memory_order_relaxed) == 0) {
          h->setException(rpc::Error("AllReduce operation cancelled due to a group change"));
        }
      }
      group.activeAllReductions.clear();
    } else {
      auto timeout = rpc->getTimeout();
      auto now = std::chrono::steady_clock::now();
      bool shouldResync = false;
      group.activeAllReductions.erase(
          std::remove_if(
              group.activeAllReductions.begin(), group.activeAllReductions.end(),
              [&](auto& wh) {
                auto h = wh.lock();
                if (!h) {
                  return true;
                }
                if (h->flags.load(std::memory_order_relaxed)) {
                  return true;
                }
                if (now >= h->timestamp + timeout) {
                  shouldResync = true;
                  h->setException(rpc::Error("AllReduce operation timed out"));
                  return true;
                }
                return false;
              }),
          group.activeAllReductions.end());
      if (shouldResync) {
        resync(group);
      }
    }

    return updated;
  }