in bistro/worker/BistroWorkerHandler.cpp [664:759]
std::chrono::seconds BistroWorkerHandler::heartbeat() noexcept {
if (committingSuicide_.load()) { // Stop sending heartbeats once dying.
return std::chrono::seconds(1);
}
// The handler's invariant is that that `server_`'s socket is already
// listening, so the scheduler will have no problem talking back to us.
try {
cpp2::SchedulerHeartbeatResponse res;
// Create a copy of worker and update system resources
cpp2::BistroWorker worker(worker_);
SYNCHRONIZED(usablePhysicalResources_) {
// The monitor is created during the first healthcheck.
if (usablePhysicalResources_.monitor_) {
try {
*worker.usableResources_ref() =
*usablePhysicalResources_.monitor_->getDataOrThrow();
} catch (const std::exception& ex) {
LOG(WARNING) << "Failed to refresh worker's usable physical "
<< "resources: " << ex.what();
}
}
}
auto scheduler_state = schedulerState_.copy(); // Take the lock only once
schedulerClientFn_(folly::EventBaseManager::get()->getEventBase())
->sync_processHeartbeat(
res, worker, *scheduler_state.workerSetID_ref());
enforceWorkerSchedulerProtocolVersion(
*worker_.protocolVersion_ref(), *res.protocolVersion_ref());
CHECK(*res.workerSetID_ref()->schedulerID_ref() == *res.id_ref());
gotNewSchedulerInstance_ = *scheduler_state.id_ref() != *res.id_ref();
if (gotNewSchedulerInstance_) {
LOG(INFO) << "Connected to new scheduler " << debugString(res);
logStateTransitionFn_("connected_to_new_scheduler", worker_, nullptr);
}
SYNCHRONIZED(state_) {
// The worker attempts to run exactly the same state / health model of
// itself as the one run by the scheduler. This way, it will stay
// healthy (or up) for as exactly as long as long as the scheduler
// wouldn't lose it.
//
// To avoid dealing with clock skew for the timeouts, they are all
// computed in terms of worker time, which is accessed here and in
// healthcheck() via time(null). Note that the worker time is taken
// before the request is sent, so it is logically earlier than the
// "time heartbeat received" used by the scheduler.
//
// As a result, all timeouts on the worker expire just before they
// would expire on the scheduler, which is the conservative approach
// to the goal of not running tasks that shouldn't start.
time_t cur_time = time(nullptr);
if (gotNewSchedulerInstance_) {
// The scheduler runs the same update logic in updateNewWorker when
// a new worker connects.
state_ = RemoteWorkerState(cur_time);
}
state_.timeLastHeartbeatReceived_ = cur_time;
// Normally, the healthcheck thread overwrites state_.state_, but this
// is the only way to get out of RemoteWorkerState::State::NEW.
setState(
&state_, RemoteWorkerState::State(*res.workerState_ref()), cur_time);
// Don't allow the worker set version to go backwards. NB: It could be
// better to also ignore other scheduler state, but this version is
// updated too rarely to be a useful sequence number.
if (!gotNewSchedulerInstance_ &&
WorkerSetIDEarlierThan()(
*res.workerSetID_ref()->version_ref(),
*scheduler_state.workerSetID_ref()->version_ref())) {
LOG(ERROR) << "Got scheduler response with older WorkerSetID "
<< "version, not updating workerSetID -- current: "
<< debugString(*scheduler_state.workerSetID_ref())
<< ", received: " << debugString(*res.workerSetID_ref());
// Holding back the workerSetID cannot cause schedulerState_.id and
// schedulerState_.workerSetID.schedulerID to diverge.
CHECK(
*scheduler_state.workerSetID_ref()->schedulerID_ref() ==
*res.id_ref());
*res.workerSetID_ref() = *scheduler_state.workerSetID_ref();
}
// Update scheduler timeouts _inside_ the state_ lock, so that from
// the point of view of healthcheck(), both the state & timeouts
// change simultaneously. As always, first lock state_, then
// schedulerState_, and don't hold the latter lock.
SYNCHRONIZED(schedulerState_) {
schedulerState_ = res;
}
}
} catch (const exception& e) {
LOG(ERROR) << "Unable to send heartbeat to scheduler: " << e.what();
logStateTransitionFn_("error_sending_heartbeat", worker_, nullptr);
}
return std::chrono::seconds(*worker_.heartbeatPeriodSec_ref());
}