void RemoteWorkers::propagateIndirectWorkerSets()

in bistro/remote/RemoteWorkers.cpp [468:573]


void RemoteWorkers::propagateIndirectWorkerSets() {
  auto indir_it = indirectVersionsOfNonMustDieWorkers_.begin();
  // As we move forward in history, store the latest versions of the current
  // workers' indirect worker sets.  This can transiently include MUST_DIE
  // workers (they are currently MUST_DIE, but we are not yet at the step in
  // history_ where they are removed), but the `first` (version) field will
  // always be current.
  VersionShardSet vss;
  for (const auto& hp : history_) {
    // We should only rarely run out of workers -- the latest version would
    // have to be unreferenced by any worker, meaning that a worker `w` just
    // became MUST_DIE, and no other workers's `indirectWorkerSetID` has
    // caught up to the version where `w` became MUST_DIE.
    if (indir_it == indirectVersionsOfNonMustDieWorkers_.end()) {
      break;  // No more workers to update.
    }

    CHECK(!WorkerSetIDEarlierThan()(indir_it->first, hp.first))
      << "Worker refers to version not in history -- history v" << hp.first
      << " came before v" << indir_it->first << " from " << indir_it->second;

    // Compute the highest indirect version referenced by workers in the
    // current history step.  Note that this will propagate workerSetID
    // dependencies through MUST_DIE workers, but this is harmless as per
    // the note in README.worker_set_consensus.
    if (auto shard_p = hp.second.removed.get_pointer()) {  // Remove, then add
      const auto& w = *mutableWorkerOrAbort(*shard_p);  // Look up shard
      // It is harmless to leave out from `vss` these "in-limbo" workers
      // that have not yet received their first WorkerSetID from this
      // scheduler.  Firstly, we are, propagating *conservative* estimates
      // of indirect WorkerSetID versions -- and if we magically knew the
      // value for this worker, it could at best *increase* the maximum
      // version in `vss`.  Secondly, once the worker gets a version, the
      // next iteration will propagate it properly, fixing the omission.
      if (w.indirectWorkerSetID().has_value()) {
        CHECK(*w.indirectWorkerSetID()->schedulerID_ref() == schedulerID_);
        removeFromVersionShardSet(&vss, *w.indirectWorkerSetID(), *shard_p);
      }
    }
    for (const auto& shard : hp.second.added) {
      const auto& w = *mutableWorkerOrAbort(shard);  // Look up shard
      if (!w.indirectWorkerSetID().has_value()) {
        continue;  // See comment above
      }
      CHECK(*w.indirectWorkerSetID()->schedulerID_ref() == schedulerID_);
      addToVersionShardSet(&vss, *w.indirectWorkerSetID(), shard);
    }
    // For all workers having the current version (from `hp`) as their
    // `indirectWorkerSet`, replace this set with the highest-versioned
    // `indirectWorkerSet` in `vss`.
    while (
      indir_it != indirectVersionsOfNonMustDieWorkers_.end() &&
      !WorkerSetIDEarlierThan()(hp.first, indir_it->first)
    ) {
      // Empty vss means that this is a version in history with *no* workers
      // connected.  A worker can clearly never get such a version as its
      // workerSetID(), which means that when vss is empty, there must be no
      // workers to update and we don't enter the loop.
      CHECK(!vss.empty());
      // We never delete intermediate versions from history_
      CHECK_EQ(hp.first, indir_it->first);

      auto it = indir_it;  // Only use `it` and not `indir_it` below.
      ++indir_it;  // So we can safely remove `it`.

      // Get a WorkerSetID for the highest-version `indirectWorkerSet`
      // assigned to any worker at the current step in history (`hp`).  The
      // easiest way to look up that worker is by shard.
      auto new_id =
        mutableWorkerOrAbort(vss.rbegin()->second)->indirectWorkerSetID();
      // Can't end up in `vss` without having an indirectWorkerSetID version.
      CHECK(new_id.has_value());
      CHECK_EQ(vss.rbegin()->first, *new_id->version_ref());
      CHECK(*new_id->schedulerID_ref() == schedulerID_);

      // Find the worker to update.
      auto& w = *mutableWorkerOrAbort(it->second);
      // Can't end up in indirectVersionsOfNonMustDieWorkers_ without having
      // an indirectWorkerSetID version.
      CHECK_EQ(it->first, *w.indirectWorkerSetID()->version_ref());
      CHECK_EQ(it->second, *w.getBistroWorker().shard_ref());
      // Since we're about to update `w`, we must also do `vss` -- the `vss`
      // update step above searches for the latest `indirectWorkerSetID`.
      auto vss_it = vss.find({it->first, it->second});
      if (vss_it != vss.end()) {
        vss.erase(vss_it);
        addToVersionShardSet(&vss, *new_id, it->second);
      }

      // Propagate to the current worker the highest-version
      // `indirectWorkerSetID` of all the workers in its current
      // `indirectWorkerSetID`.  Also updates
      // `indirectVersionsOfNonMustDieWorkers_`
      //
      // CAUTION: This invalidates `it`.
      updateIndirectWorkerSetVersion(&w, *new_id);
      CHECK(*new_id->version_ref() == *w.indirectWorkerSetID()->version_ref())
          << debugString(*new_id->version_ref())
          << " != " << debugString(*w.indirectWorkerSetID()->version_ref());
    }
  }
  CHECK(indir_it == indirectVersionsOfNonMustDieWorkers_.end())
    << "indirectWorkerSetID version " << indir_it->first << " in shard "
    << indir_it->second << " exceeds the maximum history version of "
    << (history_.empty() ? -1 : history_.rbegin()->first);
}