void RemoteWorkers::updateInitialWait()

in bistro/remote/RemoteWorkers.cpp [575:662]


void RemoteWorkers::updateInitialWait(RemoteWorkerUpdate* update) {
  // Future: Maybe remove everything in update->suicideWorkers_ from the
  // worker pools?

  std::string msg;
  if (!inInitialWait_) {
    update->setInitialWaitMessage(std::move(msg));
    return;
  }

  const time_t kMinSafeWait =
    RemoteWorkerState::maxHealthcheckGap() +
    RemoteWorkerState::loseUnhealthyWorkerAfter() +
    RemoteWorkerState::workerCheckInterval() +  // extra safety gap
    RemoteWorkerState::workerSuicideBackoffSafetyMarginSec() +
    (RemoteWorkerState::workerSuicideTaskKillWaitMs() / 1000) + 1;

  time_t min_start_time = update->curTime();
  if (FLAGS_CAUTION_startup_wait_for_workers < 0) {
    min_start_time -= kMinSafeWait;
  } else {
    min_start_time -= FLAGS_CAUTION_startup_wait_for_workers;
    if (RemoteWorkerState::maxHealthcheckGap()
        > FLAGS_CAUTION_startup_wait_for_workers) {
      msg += folly::to<std::string>(
        "DANGER! DANGER! Your --CAUTION_startup_wait_for_workers ",
        "of ", FLAGS_CAUTION_startup_wait_for_workers,
        " is lower than the max healthcheck gap of ",
        RemoteWorkerState::maxHealthcheckGap(), ", which makes it very ",
        "likely that you will start second copies of tasks that are ",
        "already running (unless your heartbeat interval is much smaller). "
      );
    } else if (kMinSafeWait > FLAGS_CAUTION_startup_wait_for_workers) {
      msg += folly::to<std::string>(
        "Your custom --CAUTION_startup_wait_for_workers is ",
        "less than the minimum safe value of ", kMinSafeWait,
        " -- this increases the risk of starting second copies of tasks ",
        "that were already running. "
      );
    }
  }

  // Are exactly the same workers connected to the scheduler now, as before
  // the restart?
  bool initial_worker_set_id_consensus =
      // The initial worker set ID is the same for all non-MUST_DIE workers,
      initialWorkerSetIDs_.end() ==
          initialWorkerSetIDs_.upper_bound(*initialWorkerSetIDs_.begin())
      // ... and it matches our non-MUST_DIE worker set, meaning that exactly
      // the same workers are connected now as the scheduler had before its
      // restart.
      && *nonMustDieWorkerSetID_.hash_ref() ==
          *initialWorkerSetIDs_.begin()->hash_ref();
  if (!initial_worker_set_id_consensus) {
    msg += "No initial worker set ID consensus. ";
  }

  // The scheduler is eligible to exit initial wait if:
  //  (i) there are no NEW workers, AND
  //  (ii) --min_startup_wait_for_workers has expired, AND
  //  (iii) EITHER the wait expired, OR all connected workers have the same
  //        initial WorkerSetID, which matches the non-MUST_DIE worker set.
  //
  // If the wait expires, we deliberately do not wait for the WorkerSetID
  // consensus, for two reasons.  Firstly, people "who know what they are
  // doing" need to be able to manually shorten the initial wait.  Secondly,
  // if the initial wait is safe, there is really no benefit to waiting for
  // the consensus -- but it *can* needlessly slow down startup if some
  // workers become unhealthy.
  if (min_start_time < startTime_ && !initial_worker_set_id_consensus) {
    msg += "Waiting for all workers to connect before running tasks.";
  // If we are eligible to exit initial wait, but are still querying running
  // tasks, then one of the 'new' workers (while transiently unresponsive)
  // might be running tasks the scheduler does not know about.  To be safe,
  // stay in initial wait until all getRunningTasks succeed.
  //
  // This test is why we cannot call updateInitialWait from processHeartbeat.
  } else if (!update->newWorkers().empty()) {
    msg += folly::to<std::string>(
      "Ready to exit initial wait, but not all workers' running tasks were "
      "fetched; not allowing tasks to start until all are fetched."
    );
  } else {
    inInitialWait_ = false;
    msg = "";
  }
  update->setInitialWaitMessage(std::move(msg));
}