void RemoteWorker::updateState()

in bistro/remote/RemoteWorker.cpp [163:227]


void RemoteWorker::updateState(
    RemoteWorkerUpdate* update,
    bool consensus_permits_becoming_healthy) {
  // MUST_DIE means we don't check health, and ignore state changes.
  if (state_.state_ == RemoteWorkerState::State::MUST_DIE) {
    return;  // Don't request suicide here since we updateState very often
  }
  auto new_state_and_disallowed =
    computeState(update->curTime(), consensus_permits_becoming_healthy);
  auto new_state = new_state_and_disallowed.first;
  if (new_state_and_disallowed.second) {
    LOG(INFO) << "Worker " << *worker_.shard_ref()
              << " can be healthy but lacks "
              << "WorkerSetID consensus";
  }
  // Careful: RemoteWorkerRunner::checkInitialWait relies on the fact that
  // this check populates addNewWorker **every** updateState.
  if (new_state == RemoteWorkerState::State::NEW) {
    // Fetch running tasks from this worker, and send a special first
    // healthcheck (see BistroWorkerHandler::runTask).
    update->addNewWorker(worker_);
    // DO: This is printed every time we retry fetching running tasks, which
    // looks a bit confusing in the logs.
    LOG(INFO) << "Initializing new worker: " << debugString(worker_);
    // Go on below to see if it's time for a a healthcheck.
  }
  if (new_state != RemoteWorkerState::State::HEALTHY
      && state_.state_ == RemoteWorkerState::State::HEALTHY) {
    LOG(WARNING) << "Worker " << *worker_.shard_ref() << " became unhealthy";
    state_.timeBecameUnhealthy_ = update->curTime();
  }
  if (new_state == RemoteWorkerState::State::MUST_DIE) {
    // Send a suicide request the moment we declare the worker lost (and
    // also on any heartbeat we receive from it thereafter).
    update->requestSuicide(worker_, "Current worker just became lost");
    deadWorkerCob_(*this);
    loseRunningTasks(update);
    setState(RemoteWorkerState::State::MUST_DIE);
    return;
  }
  if (state_.state_ != RemoteWorkerState::State::HEALTHY
      && new_state == RemoteWorkerState::State::HEALTHY) {
    LOG(INFO) << "Worker " << *worker_.shard_ref() << " became healthy";
  }
  setState(new_state);
  // Send out a new healtcheck if we are due.
  if (update->curTime() >= timeLastHealthcheckSent_
      + max(1, FLAGS_healthcheck_period)) {
    update->healthcheckWorker(worker_);
    timeLastHealthcheckSent_ = update->curTime();
  }
  // Check 'unsure if running' tasks, if they are due.
  if (unsureIfRunningTasks_.empty()) {
    repeatsOfUnsureIfRunningCheck_ = 0;  // reset exponential backoff
  } else if (update->curTime() >= timeOfLastUnsureIfRunningCheck_ + (
    std::max(1, FLAGS_unsure_if_running_check_initial_period) <<
      repeatsOfUnsureIfRunningCheck_
  )) {
    update->checkUnsureIfRunningTasks(worker_, unsureIfRunningTasks_);
    timeOfLastUnsureIfRunningCheck_ = update->curTime();
    if (repeatsOfUnsureIfRunningCheck_ < kMaxBackoffForUnsureIfRunningCheck) {
      ++repeatsOfUnsureIfRunningCheck_;
    }
  }
}