std::chrono::seconds BistroWorkerHandler::heartbeat()

in bistro/worker/BistroWorkerHandler.cpp [664:759]


std::chrono::seconds BistroWorkerHandler::heartbeat() noexcept {
  if (committingSuicide_.load()) {  // Stop sending heartbeats once dying.
    return std::chrono::seconds(1);
  }
  // The handler's invariant is that that `server_`'s socket is already
  // listening, so the scheduler will have no problem talking back to us.
  try {
    cpp2::SchedulerHeartbeatResponse res;
    // Create a copy of worker and update system resources
    cpp2::BistroWorker worker(worker_);
    SYNCHRONIZED(usablePhysicalResources_) {
      // The monitor is created during the first healthcheck.
      if (usablePhysicalResources_.monitor_) {
        try {
          *worker.usableResources_ref() =
              *usablePhysicalResources_.monitor_->getDataOrThrow();
        } catch (const std::exception& ex) {
          LOG(WARNING) << "Failed to refresh worker's usable physical "
            << "resources: " << ex.what();
        }
      }
    }
    auto scheduler_state = schedulerState_.copy();  // Take the lock only once
    schedulerClientFn_(folly::EventBaseManager::get()->getEventBase())
        ->sync_processHeartbeat(
            res, worker, *scheduler_state.workerSetID_ref());
    enforceWorkerSchedulerProtocolVersion(
        *worker_.protocolVersion_ref(), *res.protocolVersion_ref());
    CHECK(*res.workerSetID_ref()->schedulerID_ref() == *res.id_ref());

    gotNewSchedulerInstance_ = *scheduler_state.id_ref() != *res.id_ref();
    if (gotNewSchedulerInstance_) {
      LOG(INFO) << "Connected to new scheduler " << debugString(res);
      logStateTransitionFn_("connected_to_new_scheduler", worker_, nullptr);
    }

    SYNCHRONIZED(state_) {
      // The worker attempts to run exactly the same state / health model of
      // itself as the one run by the scheduler.  This way, it will stay
      // healthy (or up) for as exactly as long as long as the scheduler
      // wouldn't lose it.
      //
      // To avoid dealing with clock skew for the timeouts, they are all
      // computed in terms of worker time, which is accessed here and in
      // healthcheck() via time(null).  Note that the worker time is taken
      // before the request is sent, so it is logically earlier than the
      // "time heartbeat received" used by the scheduler.
      //
      // As a result, all timeouts on the worker expire just before they
      // would expire on the scheduler, which is the conservative approach
      // to the goal of not running tasks that shouldn't start.
      time_t cur_time = time(nullptr);
      if (gotNewSchedulerInstance_) {
        // The scheduler runs the same update logic in updateNewWorker when
        // a new worker connects.
        state_ = RemoteWorkerState(cur_time);
      }
      state_.timeLastHeartbeatReceived_ = cur_time;
      // Normally, the healthcheck thread overwrites state_.state_, but this
      // is the only way to get out of RemoteWorkerState::State::NEW.
      setState(
          &state_, RemoteWorkerState::State(*res.workerState_ref()), cur_time);

      // Don't allow the worker set version to go backwards. NB: It could be
      // better to also ignore other scheduler state, but this version is
      // updated too rarely to be a useful sequence number.
      if (!gotNewSchedulerInstance_ &&
          WorkerSetIDEarlierThan()(
              *res.workerSetID_ref()->version_ref(),
              *scheduler_state.workerSetID_ref()->version_ref())) {
        LOG(ERROR) << "Got scheduler response with older WorkerSetID "
                   << "version, not updating workerSetID -- current: "
                   << debugString(*scheduler_state.workerSetID_ref())
                   << ", received: " << debugString(*res.workerSetID_ref());
        // Holding back the workerSetID cannot cause schedulerState_.id and
        // schedulerState_.workerSetID.schedulerID to diverge.
        CHECK(
            *scheduler_state.workerSetID_ref()->schedulerID_ref() ==
            *res.id_ref());
        *res.workerSetID_ref() = *scheduler_state.workerSetID_ref();
      }

      // Update scheduler timeouts _inside_ the state_ lock, so that from
      // the point of view of healthcheck(), both the state & timeouts
      // change simultaneously.  As always, first lock state_, then
      // schedulerState_, and don't hold the latter lock.
      SYNCHRONIZED(schedulerState_) {
        schedulerState_ = res;
      }
    }
  } catch (const exception& e) {
    LOG(ERROR) << "Unable to send heartbeat to scheduler: " << e.what();
    logStateTransitionFn_("error_sending_heartbeat", worker_, nullptr);
  }
  return std::chrono::seconds(*worker_.heartbeatPeriodSec_ref());
}