in bistro/remote/RemoteWorkers.cpp [468:573]
void RemoteWorkers::propagateIndirectWorkerSets() {
auto indir_it = indirectVersionsOfNonMustDieWorkers_.begin();
// As we move forward in history, store the latest versions of the current
// workers' indirect worker sets. This can transiently include MUST_DIE
// workers (they are currently MUST_DIE, but we are not yet at the step in
// history_ where they are removed), but the `first` (version) field will
// always be current.
VersionShardSet vss;
for (const auto& hp : history_) {
// We should only rarely run out of workers -- the latest version would
// have to be unreferenced by any worker, meaning that a worker `w` just
// became MUST_DIE, and no other workers's `indirectWorkerSetID` has
// caught up to the version where `w` became MUST_DIE.
if (indir_it == indirectVersionsOfNonMustDieWorkers_.end()) {
break; // No more workers to update.
}
CHECK(!WorkerSetIDEarlierThan()(indir_it->first, hp.first))
<< "Worker refers to version not in history -- history v" << hp.first
<< " came before v" << indir_it->first << " from " << indir_it->second;
// Compute the highest indirect version referenced by workers in the
// current history step. Note that this will propagate workerSetID
// dependencies through MUST_DIE workers, but this is harmless as per
// the note in README.worker_set_consensus.
if (auto shard_p = hp.second.removed.get_pointer()) { // Remove, then add
const auto& w = *mutableWorkerOrAbort(*shard_p); // Look up shard
// It is harmless to leave out from `vss` these "in-limbo" workers
// that have not yet received their first WorkerSetID from this
// scheduler. Firstly, we are, propagating *conservative* estimates
// of indirect WorkerSetID versions -- and if we magically knew the
// value for this worker, it could at best *increase* the maximum
// version in `vss`. Secondly, once the worker gets a version, the
// next iteration will propagate it properly, fixing the omission.
if (w.indirectWorkerSetID().has_value()) {
CHECK(*w.indirectWorkerSetID()->schedulerID_ref() == schedulerID_);
removeFromVersionShardSet(&vss, *w.indirectWorkerSetID(), *shard_p);
}
}
for (const auto& shard : hp.second.added) {
const auto& w = *mutableWorkerOrAbort(shard); // Look up shard
if (!w.indirectWorkerSetID().has_value()) {
continue; // See comment above
}
CHECK(*w.indirectWorkerSetID()->schedulerID_ref() == schedulerID_);
addToVersionShardSet(&vss, *w.indirectWorkerSetID(), shard);
}
// For all workers having the current version (from `hp`) as their
// `indirectWorkerSet`, replace this set with the highest-versioned
// `indirectWorkerSet` in `vss`.
while (
indir_it != indirectVersionsOfNonMustDieWorkers_.end() &&
!WorkerSetIDEarlierThan()(hp.first, indir_it->first)
) {
// Empty vss means that this is a version in history with *no* workers
// connected. A worker can clearly never get such a version as its
// workerSetID(), which means that when vss is empty, there must be no
// workers to update and we don't enter the loop.
CHECK(!vss.empty());
// We never delete intermediate versions from history_
CHECK_EQ(hp.first, indir_it->first);
auto it = indir_it; // Only use `it` and not `indir_it` below.
++indir_it; // So we can safely remove `it`.
// Get a WorkerSetID for the highest-version `indirectWorkerSet`
// assigned to any worker at the current step in history (`hp`). The
// easiest way to look up that worker is by shard.
auto new_id =
mutableWorkerOrAbort(vss.rbegin()->second)->indirectWorkerSetID();
// Can't end up in `vss` without having an indirectWorkerSetID version.
CHECK(new_id.has_value());
CHECK_EQ(vss.rbegin()->first, *new_id->version_ref());
CHECK(*new_id->schedulerID_ref() == schedulerID_);
// Find the worker to update.
auto& w = *mutableWorkerOrAbort(it->second);
// Can't end up in indirectVersionsOfNonMustDieWorkers_ without having
// an indirectWorkerSetID version.
CHECK_EQ(it->first, *w.indirectWorkerSetID()->version_ref());
CHECK_EQ(it->second, *w.getBistroWorker().shard_ref());
// Since we're about to update `w`, we must also do `vss` -- the `vss`
// update step above searches for the latest `indirectWorkerSetID`.
auto vss_it = vss.find({it->first, it->second});
if (vss_it != vss.end()) {
vss.erase(vss_it);
addToVersionShardSet(&vss, *new_id, it->second);
}
// Propagate to the current worker the highest-version
// `indirectWorkerSetID` of all the workers in its current
// `indirectWorkerSetID`. Also updates
// `indirectVersionsOfNonMustDieWorkers_`
//
// CAUTION: This invalidates `it`.
updateIndirectWorkerSetVersion(&w, *new_id);
CHECK(*new_id->version_ref() == *w.indirectWorkerSetID()->version_ref())
<< debugString(*new_id->version_ref())
<< " != " << debugString(*w.indirectWorkerSetID()->version_ref());
}
}
CHECK(indir_it == indirectVersionsOfNonMustDieWorkers_.end())
<< "indirectWorkerSetID version " << indir_it->first << " in shard "
<< indir_it->second << " exceeds the maximum history version of "
<< (history_.empty() ? -1 : history_.rbegin()->first);
}