TaskRunnerResponse RemoteWorkerRunner::runTaskImpl()

in bistro/runners/RemoteWorkerRunner.cpp [574:769]


TaskRunnerResponse RemoteWorkerRunner::runTaskImpl(
  const std::shared_ptr<const Job>& job,
  const Node& node,
  cpp2::RunningTask& rt,
  folly::dynamic& job_args,
  function<void(const cpp2::RunningTask& rt, TaskStatus&& status)> cb
) noexcept {
  if (inInitialWait_.load(std::memory_order_relaxed)) {
    return DoNotRunMoreTasks;
  }
  auto config = config_.copy();
  CHECK(config) << "Cannot runTask before updateConfig";

  // Make a copy so we don't have to lock workers_ while waiting for Thrift.
  cpp2::BistroWorker worker;
  // unused initialization to hush a Clang warning:
  int64_t did_not_run_sequence_num = 0;
  // These will modify the task's cgroupOptions if we find a worker.
  int16_t cgroup_cpu_shares = 0;
  int64_t cgroup_memory_limit_in_bytes = 0;
  // Always lock workerResources_ first, then workers_
  SYNCHRONIZED(workerResources_) {
    SYNCHRONIZED(workers_) {
      CHECK(workerLevel_ != StringTable::NotFound)
        << "updateConfig must always be invoked before runTask";
      // Whichever selector is currently in use is protected by the locks on
      // WorkerResources and RemoteWorkers, so no extra mutex is needed.
      auto response = RemoteWorkerSelector::getSingleton(
        // Since we use a memoized config, the selector type might be a bit
        // stale, but that's no big deal.  We need the stale config anyway,
        // since its worker level ID is the correct index into
        // workerResources_.
        config->remoteWorkerSelectorType
      )->findWorker(
        config.get(),
        *job,
        node,
        workerLevel_,  // Needed for checking worker-level job filters.
        monitor_.get(),
        &workerResources_,
        &workers_,
        &worker,
        &did_not_run_sequence_num
      );
      if (response != RanTask) {
        return response;
      }
      *rt.workerShard_ref() = *worker.shard_ref();
      job_args["worker_node"] = *rt.workerShard_ref();
      job_args["worker_host"] = *worker.machineLock_ref()->hostname_ref();

      // Add worker resources to rt **before** recording this task as running.
      // Future: support this in LocalRunner mode as well.
      auto& resources_by_node = job_args["resources_by_node"];
      CHECK(
          resources_by_node.find(*rt.workerShard_ref()) ==
          resources_by_node.items().end())
          << "Cannot have both a node and a worker named: "
          << *rt.workerShard_ref()
          << " -- if you are running a worker and the central scheduler on the "
          << "same host, you should specify --instance_node_name global.";
      if (const auto* nr = addNodeResourcesToRunningTask(
              &rt,
              &resources_by_node,
              // This config is stale, but it's consistent with
              // workerResources_.
              *config,
              *rt.workerShard_ref(),
              workerLevel_,
              job->resources())) {
        for (const auto& r : *nr->resources_ref()) {
          auto phys_it = config->logicalToPhysical.find(*r.name_ref());
          if (phys_it != config->logicalToPhysical.end()) {
            const auto& p =  // at() is like CHECK, since this is `noexcept`
              config->physicalResourceConfigs.at(phys_it->second);
            if (*p.physical_ref() == cpp2::PhysicalResource::RAM_MBYTES &&
                *p.enforcement_ref() ==
                    cpp2::PhysicalResourceEnforcement::HARD) {
              logicalToIntPhysicalResource(
                &cgroup_memory_limit_in_bytes, p, r
              );
            } else if (
                *p.physical_ref() == cpp2::PhysicalResource::CPU_CORES &&
                *p.enforcement_ref() ==
                    cpp2::PhysicalResourceEnforcement::SOFT) {
              logicalToIntPhysicalResource(&cgroup_cpu_shares, p, r);
              // Multiply by 2 here since cgroup cpu.shares must be >= 2.
              clampPhysical(&cgroup_cpu_shares, 2*cgroup_cpu_shares, r);
            }
          }
        }
      }

      // Mark the task 'running' before we unlock workerResources_, because
      // otherwise an intervening updateConfig() could free the resources we
      // just grabbed.
      auto status = TaskStatus::running(
          // Pass some additional metadata to TaskStatusObservers that log
          make_shared<folly::dynamic>(
              folly::dynamic::object("shard", *worker.shard_ref())(
                  "hostname", *worker.machineLock_ref()->hostname_ref())(
                  "port", *worker.machineLock_ref()->port_ref())));
      workers_.mutableWorkerOrAbort(*worker.shard_ref())
          ->recordRunningTaskStatus(rt, status);
      // IMPORTANT: Update TaskStatuses **inside** the workers_ lock,
      // otherwise e.g.  a worker could get lost before cb runs, failing the
      // TaskStatusSnapshot::updateStatus() check for "when a task is not
      // running, the received status is not overwritable".
      cb(rt, std::move(status));
    }
  }

  // Now we have a healthy worker with resources we already grabbed. It may
  // get marked unhealthy or even lost, since workers_ is now unlocked, but
  // we just have to try our luck.
  eventBase_->runInEventBaseThread([
      cb, this, job, rt, job_args, worker, did_not_run_sequence_num,
      cgroup_cpu_shares, cgroup_memory_limit_in_bytes
    ]() noexcept {
    try {
      shared_ptr<cpp2::BistroWorkerAsyncClient> client =
          workerClientFn_(eventBase_.get(), *worker.addr_ref());
      auto task_subproc_opts = job->taskSubprocessOptions();
      *task_subproc_opts.cgroupOptions_ref()->cpuShares_ref() =
          cgroup_cpu_shares;
      *task_subproc_opts.cgroupOptions_ref()->memoryLimitInBytes_ref() =
          cgroup_memory_limit_in_bytes;
      client->runTask(
          unique_ptr<RequestCallback>(new FunctionReplyCallback(
              [ this, cb, client, rt, worker, did_not_run_sequence_num ](
                  ClientReceiveState && state) noexcept {
                // TODO(#5025478): Convert this, and the other recv_* calls in
                // this file to use recv_wrapped_*.
                try {
                  client->recv_runTask(state);
                } catch (const cpp2::BistroWorkerException& e) {
                  LOG(ERROR) << "Worker never started task " << debugString(rt)
                             << ": " << *e.message_ref();
                  // Okay to mark the task "not running" since we know for sure
                  // that the worker received & processed our request, and
                  // decided not to run it.
                  SYNCHRONIZED(workers_) {
                    auto status = TaskStatus::neverStarted(*e.message_ref());
                    workers_.mutableWorkerOrAbort(*worker.shard_ref())
                        ->recordFailedTask(rt, status);
                    // IMPORTANT: Update TaskStatuses **inside** the workers_
                    // lock
                    cb(rt, std::move(status));
                  }
                } catch (const std::exception& e) {
                  // The task may or may not be running, so do NOT invoke the
                  // callback, or the scheduler might schedule duplicate tasks,
                  // exceed resource limits, etc.
                  LOG(ERROR) << "The runTask request hit an error on "
                             << debugString(rt)
                             << ", will have to poll to find if the "
                                "task is running: "
                             << e.what();
                  SYNCHRONIZED(workers_) {
                    workers_.mutableWorkerOrAbort(*worker.shard_ref())
                        ->addUnsureIfRunningTask(rt);
                  }
                }
              })),
          rt,
          folly::toJson(job_args),
          job->command().empty()
              ? std::vector<std::string>{/*--worker_command*/}
              : job->command(),
          schedulerID_,
          *worker.id_ref(),
          // The real sequence number may have been incremented after we found
          // the worker, but this is okay, the worker simply rejects the task.
          did_not_run_sequence_num,
          std::move(task_subproc_opts));
    } catch (const exception& e) {
      // We can get here if client creation failed (e.g. AsyncSocket could
      // not resolve the hostname), or if the runTask request creation
      // failed.  The latter can __probably__ only happen if the connection
      // is dead at the time of the request.
      //
      // The key part is that in both cases, it ought to be true that the
      // request never reached the worker, and since the task is not
      // running, we can invoke the callback.
      LOG(ERROR) << "Error connecting to the worker: " << e.what();
      SYNCHRONIZED(workers_) {
        auto status = TaskStatus::neverStarted(e.what());
        workers_.mutableWorkerOrAbort(*worker.shard_ref())
            ->recordFailedTask(rt, status);
        // IMPORTANT: Update TaskStatuses **inside** the workers_ lock
        cb(rt, std::move(status));
      }
    }
  });
  return RanTask;
}