in bistro/remote/RemoteWorker.cpp [163:227]
void RemoteWorker::updateState(
RemoteWorkerUpdate* update,
bool consensus_permits_becoming_healthy) {
// MUST_DIE means we don't check health, and ignore state changes.
if (state_.state_ == RemoteWorkerState::State::MUST_DIE) {
return; // Don't request suicide here since we updateState very often
}
auto new_state_and_disallowed =
computeState(update->curTime(), consensus_permits_becoming_healthy);
auto new_state = new_state_and_disallowed.first;
if (new_state_and_disallowed.second) {
LOG(INFO) << "Worker " << *worker_.shard_ref()
<< " can be healthy but lacks "
<< "WorkerSetID consensus";
}
// Careful: RemoteWorkerRunner::checkInitialWait relies on the fact that
// this check populates addNewWorker **every** updateState.
if (new_state == RemoteWorkerState::State::NEW) {
// Fetch running tasks from this worker, and send a special first
// healthcheck (see BistroWorkerHandler::runTask).
update->addNewWorker(worker_);
// DO: This is printed every time we retry fetching running tasks, which
// looks a bit confusing in the logs.
LOG(INFO) << "Initializing new worker: " << debugString(worker_);
// Go on below to see if it's time for a a healthcheck.
}
if (new_state != RemoteWorkerState::State::HEALTHY
&& state_.state_ == RemoteWorkerState::State::HEALTHY) {
LOG(WARNING) << "Worker " << *worker_.shard_ref() << " became unhealthy";
state_.timeBecameUnhealthy_ = update->curTime();
}
if (new_state == RemoteWorkerState::State::MUST_DIE) {
// Send a suicide request the moment we declare the worker lost (and
// also on any heartbeat we receive from it thereafter).
update->requestSuicide(worker_, "Current worker just became lost");
deadWorkerCob_(*this);
loseRunningTasks(update);
setState(RemoteWorkerState::State::MUST_DIE);
return;
}
if (state_.state_ != RemoteWorkerState::State::HEALTHY
&& new_state == RemoteWorkerState::State::HEALTHY) {
LOG(INFO) << "Worker " << *worker_.shard_ref() << " became healthy";
}
setState(new_state);
// Send out a new healtcheck if we are due.
if (update->curTime() >= timeLastHealthcheckSent_
+ max(1, FLAGS_healthcheck_period)) {
update->healthcheckWorker(worker_);
timeLastHealthcheckSent_ = update->curTime();
}
// Check 'unsure if running' tasks, if they are due.
if (unsureIfRunningTasks_.empty()) {
repeatsOfUnsureIfRunningCheck_ = 0; // reset exponential backoff
} else if (update->curTime() >= timeOfLastUnsureIfRunningCheck_ + (
std::max(1, FLAGS_unsure_if_running_check_initial_period) <<
repeatsOfUnsureIfRunningCheck_
)) {
update->checkUnsureIfRunningTasks(worker_, unsureIfRunningTasks_);
timeOfLastUnsureIfRunningCheck_ = update->curTime();
if (repeatsOfUnsureIfRunningCheck_ < kMaxBackoffForUnsureIfRunningCheck) {
++repeatsOfUnsureIfRunningCheck_;
}
}
}