void BistroWorkerHandler::runTask()

in bistro/worker/BistroWorkerHandler.cpp [217:371]


void BistroWorkerHandler::runTask(
    const cpp2::RunningTask& rt,
    const string& config,
    const vector<string>& cmd,
    const cpp2::BistroInstanceID& scheduler,
    const cpp2::BistroInstanceID& worker,
    int64_t notify_if_tasks_not_running_sequence_num,
    const cpp2::TaskSubprocessOptions& opts) {

  throwIfSuicidal();

  // 100 ms per log => 10 tasks/sec
  AutoTimer<> timer("runTask was slow", std::chrono::milliseconds{100});

  bool isHealthcheck = *rt.job_ref() == kHealthcheckTaskJob;
  // Tells the scheduler that we aren't even going to try running this.  Run
  // healthchecks even if we're unhealthy, because otherwise we'd never be
  // able to recover from missing a healthcheck.
  if (state_->state_ != RemoteWorkerState::State::HEALTHY && !isHealthcheck) {
    throw BistroWorkerException(
        "Unhealthy, not running task: ", *rt.job_ref(), ", ", *rt.node_ref());
  }
  // Accept "new worker" healthchecks even if the scheduler ID doesn't
  // match.  This is important since those healthchecks often arrive just
  // before the initial heartbeat response (so we still have the old
  // scheduler ID).  Accepting them prevents unnecessary worker loss, and
  // speeds up failovers.
  //
  // If multiple schedulers concurrently try to talk to this worker (e.g.
  // due to operational errors), we don't want to blindly accept
  // healthchecks from both, as 50+% of the checks would fail.  Since we
  // only accept "new worker" healthchecks here, this risk is very low.
  //
  // TODO: An alternative is for every heartbeat to generate a fresh
  // 'alternative' scheduler ID on the worker side, which serves to
  // authenticate the scheduler in the common case of the the healthcheck's
  // runTask arriving before the next heartbeat.
  if (isHealthcheck && worker == *worker_.id_ref() &&
      *rt.node_ref() == kHealthcheckTaskNewWorkerNode &&
      scheduler != *schedulerState_->id_ref()) {
    LOG(INFO) << "Accepting a 'new worker' healthcheck from an unknown "
      << "scheduler, in the hopes that its heartbeat response is just about "
      << "to arrive.";
    // It may be reasonable to set *scheduler into schedulerState_->id here...
  } else {
    throwOnInstanceIDMismatch("runTask", scheduler, worker);
  }

  if (isHealthcheck) {
    // This means that a worker will not know its usable physical resources
    // until it receives the first healthcheck, and that any cgroup config
    // changes (which should be extremely rare) will only propagate to
    // workers during healthchecks.  On the bright side, this means that
    // cgroup configs *can* be changed at runtime, and that the lock on
    // systemCGroupOpts_ causes a minimal amount of contention, since
    // healthchecks should not be too frequent.
    SYNCHRONIZED(usablePhysicalResources_) {
      // Future: Ideally, the refresh interval could also be changed at
      // run-time, and it would be possible to add a "log" callback to
      // display the new system resources whenever the cgroups change.
      // However, both are too much hassle with the current PeriodicPoller,
      // and it's too much hassle to roll a custom poller.
      if (!usablePhysicalResources_.monitor_) {
        LOG(WARNING) << "CGroups set: "
                     << debugString(*opts.cgroupOptions_ref());
        usablePhysicalResources_.monitor_ =
            std::make_unique<UsablePhysicalResourceMonitor>(
                CGroupPaths(*opts.cgroupOptions_ref(), folly::none),
                FLAGS_physical_resources_subprocess_timeout_ms,
                std::chrono::seconds(
                    FLAGS_refresh_usable_physical_resources_sec));
        usablePhysicalResources_.cgroupOpts_ = *opts.cgroupOptions_ref();
      } else if (
          *opts.cgroupOptions_ref() != usablePhysicalResources_.cgroupOpts_) {
        LOG(WARNING) << "CGroups changed: "
                     << debugString(*opts.cgroupOptions_ref());
        usablePhysicalResources_.monitor_->updateCGroupPaths(
            CGroupPaths(*opts.cgroupOptions_ref(), folly::none));
        usablePhysicalResources_.cgroupOpts_ = *opts.cgroupOptions_ref();
      }
    }
    LOG(INFO) << "Queueing healthcheck started at "
              << *rt.invocationID_ref()->startTime_ref();
  } else {
    LOG(INFO) << "Queueing task: " << debugString(rt);
  }

  SYNCHRONIZED_CONST(state_) { SYNCHRONIZED(runningTasks_) {
    // Don't run tasks sent before the most recent notifyIfTasksNotRunning,
    // except healthchecks, which don't use notifyIfTasksNotRunning anyway.
    if (
      !isHealthcheck && notify_if_tasks_not_running_sequence_num <
        state_.notifyIfTasksNotRunningSequenceNum_
    ) {
      throw BistroWorkerException(
        "Not running task ", debugString(rt), " since its "
        "notifyIfTasksNotRunning sequence number is ",
        notify_if_tasks_not_running_sequence_num, " < ",
        state_.notifyIfTasksNotRunningSequenceNum_
      );
    }

    // Mark the task as "running", if it was not already running.
    auto it_and_success =
        runningTasks_.emplace(TaskID{*rt.job_ref(), *rt.node_ref()}, rt);
    if (!it_and_success.second) {
      if (isHealthcheck) {
        // The scheduler doesn't track healtchecks, it might send duplicates.
        throw BistroWorkerException("Another healtcheck is already running.");
      }
      CHECK(
          *it_and_success.first->second.invocationID_ref() !=
          *rt.invocationID_ref())
          << "RunningTask was runTask'd more than once: " << debugString(rt);
      // This can happen if updateStatus succeeds on the scheduler, and
      // fails on the worker.  In this case, the scheduler might send the
      // next invocation of the same task before the next updateStatus retry.
      throw BistroWorkerException(
        "Tried to runTask when another invocation of this task is running: ",
        debugString(rt)
      );
    }
  } }

  timer.log("runTask setup was slow");

  taskQueue_.runTask(
    rt,
    cmd.empty() ? vector<string>{workerCommand_} : cmd,
    config,  // Job config argument -- DO: elide the extra copy?
    jobsDir_ / *rt.job_ref(),  // Working directory for the task
    [this](const cpp2::RunningTask& runningTask, TaskStatus&& status) noexcept {
      // 10 tasks / sec
      folly::AutoTimer<> updateQueueTimer(
          "Task update queue was slow", std::chrono::milliseconds{100});
      notifyFinishedQueue_.blockingWrite(std::make_unique<NotifyData>(
          TaskID{*runningTask.job_ref(), *runningTask.node_ref()},
          std::move(status)));
      logStateTransitionFn_("completed_task", worker_, &runningTask);
    },
    [this](
      const cpp2::RunningTask& runningTask, cpp2::TaskPhysicalResources&& res
    ) noexcept {
      SYNCHRONIZED(runningTasks_) {
        auto it = runningTasks_.find({*runningTask.job_ref(), *runningTask.node_ref()});
        CHECK (it != runningTasks_.end()) << "Bad task: "
                                          << debugString(runningTask);
        it->second.physicalResources_ref().value_unchecked() = std::move(res);
        apache::thrift::ensure_isset_unsafe(it->second.physicalResources_ref());
      }
    },
    opts
  );
  logStateTransitionFn_("queued_task", worker_, &rt);
}