void update()

in src/accumulator.cc [485:641]


  void update() {
    rpc::AutoGradMode ng(false);

    if (shouldUpdateGroup_) {
      groupService->update(*group, 0, 10 * 1000);
    }

    glock l(h->mutex);

    auto now = std::chrono::steady_clock::now();

    bool logit = now - lastlog >= std::chrono::seconds(5);
    if (logit) {
      lastlog = now;
      log.debug("DEBUG syncUpdate()\n");
    }

    auto result = [&](auto&& r) {
      if (r) {
        if (r->tryCall()) {
          r = {};

          return true;
        }
      }
      return false;
    };

    if (result(findLeaderResult)) {
      if (syncLeader == myName) {
        log.verbose("I am now the leader!\n");
      }
    }
    if (!hasGradients_ && !isCopyingGradients) {
      auto& v = gradientReductions[nextGradientReductionResultIndex];
      if (v) {
        result(&v->result);
        if (nextGradientReductionResultIndex == gradientReductions.size() - 1) {
          nextGradientReductionResultIndex = 0;
        } else {
          ++nextGradientReductionResultIndex;
        }
      }
    }

    result(requestModelResult);

    if (h->syncId != group->syncId) {
      h->syncId = group->syncId;
      syncLeader.clear();

      findLeaderResult = {};
      for (auto& v : gradientReductions) {
        v = {};
      }
      nextGradientReductionIndex = 0;
      nextGradientReductionResultIndex = 0;
      requestModelResult = {};
      h->requestedModelUpdate.clear();

      h->haveNewParameters = false;
      hasNewUserState_ = false;
      wantsUserState_ = false;

      isFindingLeader = true;
      isWaitingForModel = false;
      hasGradients_ = false;
      members_.clear();
      if (h->syncId == 0) {
        log.verbose("Sync failure, could not join group\n", h->syncId);
      } else {
        log.verbose("Sync %#x success, finding leader\n", h->syncId);

        findLeaderResult = std::make_shared<ResultCallback>();
        AccumulatorFindLeaderType data;
        data.modelVersion = h->modelVersion;
        data.name = myName;
        findLeaderReduce = allReduceService->allReduce(
            group, "Accumulator::findLeader", std::move(data),
            [&](AccumulatorFindLeaderType& a, AccumulatorFindLeaderType& b) {
              if (std::tie(a.modelVersion, a.name) < std::tie(b.modelVersion, b.name)) {
                std::swap(a, b);
              }
            },
            [findLeaderResult = this->findLeaderResult,
             this](AccumulatorFindLeaderType* v, [[maybe_unused]] rpc::Error* error) {
              if (v) {
                *findLeaderResult = [this, modelVersion = v->modelVersion,
                                     leader = std::string(v->name)]() mutable noexcept {
                  isFindingLeader = false;
                  if (syncLeader.empty()) {
                    log.verbose("Leader is %s\n", leader);
                  } else if (syncLeader == leader) {
                    log.verbose("Leader is still %s\n", leader);
                  } else {
                    log.verbose("Leader changed from %s to %s\n", syncLeader, leader);
                  }
                  syncLeader = std::move(leader);
                  {
                    std::lock_guard l(group->mutex);
                    members_ = group->members;
                  }
                  log.verbose("Group has %d members\n", members_.size());
                  if (modelVersion != h->modelVersion || !hasReceivedModel_) {
                    requestModel();
                  }
                };
              } else {
                log.error("findLeader failed with error %s\n", error->what());
                *findLeaderResult = [this]() mutable {
                  if (h->syncId == group->syncId) {
                    resync();
                  }
                };
              }
            });
      }
    }

    if (h->haveNewParameters) {
      if (!isWaitingForModel && h->modelVersion != h->newModelVersion) {
        h->haveNewParameters = false;
        log.debug(
            "ignoring unexpected new params due to modelVersion mismatch (got %d, expected %d)\n", h->newModelVersion,
            h->modelVersion);
      } else {
        log.debug("DEBUG new params\n");
        commitModelUpdate();
        isWaitingForModel = false;
      }
    } else if (isWaitingForModel && now - isWaitingForModelTimestamp >= std::chrono::seconds(10)) {
      log.debug("Timed out waiting for model, retrying\n");
      requestModel();
    } else if (
        !isWaitingForModel && connectedImpl() && syncLeader != myName &&
        now - lastReceivedModel >= std::chrono::minutes(2)) {
      log.verbose("EMERGENCY RESYNC TO GET MODEL UPDATE!\n");
      lastReceivedModel = now;
      resync();
    }
    if (h->haveNewBuffers) {
      commitBuffersUpdate();
    }

    if (!members_.empty()) {
      sendModelUpdates();
    }

    if (logit) {
      log.debug("DEBUG connected: %d\n", connectedImpl());
      log.debug("DEBUG isWaitingForModel: %d\n", isWaitingForModel);
      log.debug("DEBUG isFindingLeader: %d\n", isFindingLeader);
      log.debug("DEBUG isCopyingGradients: %d\n", isCopyingGradients);
      log.debug("DEBUG nextGradientReductionIndex: %d\n", nextGradientReductionIndex);
      log.debug("DEBUG nextGradientReductionResultIndex: %d\n", nextGradientReductionResultIndex);
    }
  }