in bistro/remote/RemoteWorkers.cpp [575:662]
void RemoteWorkers::updateInitialWait(RemoteWorkerUpdate* update) {
// Future: Maybe remove everything in update->suicideWorkers_ from the
// worker pools?
std::string msg;
if (!inInitialWait_) {
update->setInitialWaitMessage(std::move(msg));
return;
}
const time_t kMinSafeWait =
RemoteWorkerState::maxHealthcheckGap() +
RemoteWorkerState::loseUnhealthyWorkerAfter() +
RemoteWorkerState::workerCheckInterval() + // extra safety gap
RemoteWorkerState::workerSuicideBackoffSafetyMarginSec() +
(RemoteWorkerState::workerSuicideTaskKillWaitMs() / 1000) + 1;
time_t min_start_time = update->curTime();
if (FLAGS_CAUTION_startup_wait_for_workers < 0) {
min_start_time -= kMinSafeWait;
} else {
min_start_time -= FLAGS_CAUTION_startup_wait_for_workers;
if (RemoteWorkerState::maxHealthcheckGap()
> FLAGS_CAUTION_startup_wait_for_workers) {
msg += folly::to<std::string>(
"DANGER! DANGER! Your --CAUTION_startup_wait_for_workers ",
"of ", FLAGS_CAUTION_startup_wait_for_workers,
" is lower than the max healthcheck gap of ",
RemoteWorkerState::maxHealthcheckGap(), ", which makes it very ",
"likely that you will start second copies of tasks that are ",
"already running (unless your heartbeat interval is much smaller). "
);
} else if (kMinSafeWait > FLAGS_CAUTION_startup_wait_for_workers) {
msg += folly::to<std::string>(
"Your custom --CAUTION_startup_wait_for_workers is ",
"less than the minimum safe value of ", kMinSafeWait,
" -- this increases the risk of starting second copies of tasks ",
"that were already running. "
);
}
}
// Are exactly the same workers connected to the scheduler now, as before
// the restart?
bool initial_worker_set_id_consensus =
// The initial worker set ID is the same for all non-MUST_DIE workers,
initialWorkerSetIDs_.end() ==
initialWorkerSetIDs_.upper_bound(*initialWorkerSetIDs_.begin())
// ... and it matches our non-MUST_DIE worker set, meaning that exactly
// the same workers are connected now as the scheduler had before its
// restart.
&& *nonMustDieWorkerSetID_.hash_ref() ==
*initialWorkerSetIDs_.begin()->hash_ref();
if (!initial_worker_set_id_consensus) {
msg += "No initial worker set ID consensus. ";
}
// The scheduler is eligible to exit initial wait if:
// (i) there are no NEW workers, AND
// (ii) --min_startup_wait_for_workers has expired, AND
// (iii) EITHER the wait expired, OR all connected workers have the same
// initial WorkerSetID, which matches the non-MUST_DIE worker set.
//
// If the wait expires, we deliberately do not wait for the WorkerSetID
// consensus, for two reasons. Firstly, people "who know what they are
// doing" need to be able to manually shorten the initial wait. Secondly,
// if the initial wait is safe, there is really no benefit to waiting for
// the consensus -- but it *can* needlessly slow down startup if some
// workers become unhealthy.
if (min_start_time < startTime_ && !initial_worker_set_id_consensus) {
msg += "Waiting for all workers to connect before running tasks.";
// If we are eligible to exit initial wait, but are still querying running
// tasks, then one of the 'new' workers (while transiently unresponsive)
// might be running tasks the scheduler does not know about. To be safe,
// stay in initial wait until all getRunningTasks succeed.
//
// This test is why we cannot call updateInitialWait from processHeartbeat.
} else if (!update->newWorkers().empty()) {
msg += folly::to<std::string>(
"Ready to exit initial wait, but not all workers' running tasks were "
"fetched; not allowing tasks to start until all are fetched."
);
} else {
inInitialWait_ = false;
msg = "";
}
update->setInitialWaitMessage(std::move(msg));
}