void RemoteWorkerRunner::updateConfig()

in bistro/runners/RemoteWorkerRunner.cpp [154:304]


void RemoteWorkerRunner::updateConfig(std::shared_ptr<const Config> config) {
  folly::AutoTimer<> timer;
  DEFINE_MONITOR_ERROR(monitor_, error, "RemoteWorkerRunner resource update");

  // Memoize these two values to be used by the next runTask
  workerLevel_ = config->levels.lookup("worker");
  CHECK(workerLevel_ != config->levels.NotFound);
  SYNCHRONIZED(config_) {
    config_ = config;
  }

  const auto& def_resources = config->resourcesByLevel[workerLevel_];
  // Always lock workerResources_ first, then workers_
  SYNCHRONIZED(workerResources_) { SYNCHRONIZED_CONST(workers_) {
    workerResources_.clear();
    // While it __looks__ like we no longer need 'workers' to be locked
    // after this loop, that lock is actually simultaneously ensuring that
    // RemoteWorkerRunner::runTaskImpl() gets to mark the task running in
    // the scheduler *BEFORE* the next updateWorkerResources().  So, there
    // would be no savings by releasing the lock before we are done here.
    for (const auto& wconn : workers_.workerPool()) {
      const auto& w = wconn.second->getBistroWorker();

      auto& w_res = workerResources_[*w.shard_ref()];
      w_res = def_resources;  // Start with the defaults.

      // Apply any known physical resources.
      for (const auto& prc : config->physicalResourceConfigs) {
        // Apply CPU, RAM, # of GPU cards.
        if (const auto val = [&]() -> folly::Optional<int> {
              switch (*prc.physical_ref()) {
                case cpp2::PhysicalResource::RAM_MBYTES:
                  return physicalToLogicalResource(
                      prc, *w.usableResources_ref()->memoryMB_ref());
                case cpp2::PhysicalResource::CPU_CORES:
                  return physicalToLogicalResource(
                      prc, *w.usableResources_ref()->cpuCores_ref());
                case cpp2::PhysicalResource::GPU_CARDS:
                  return physicalToLogicalResource(
                      prc, w.usableResources_ref()->gpus_ref()->size());
                default:
                  return folly::none;
              }
            }()) {
          CHECK_GE(*prc.logicalResourceID_ref(), 0);
          CHECK_LT(*prc.logicalResourceID_ref(), w_res.size());
          w_res[*prc.logicalResourceID_ref()] = *val;
        }
        // If the user configured special resources for GPU card models,
        // populate them too.
        if (*prc.physical_ref() == cpp2::PhysicalResource::GPU_CARDS) {
          for (const auto& gpu : *w.usableResources_ref()->gpus_ref()) {
            auto r_name = "GPU: " + *gpu.name_ref();
            auto rid = config->resourceNames.lookup(r_name);
            if (rid != StringTable::NotFound) {
              CHECK_GE(rid, 0);
              if (rid >= w_res.size()
                  || w_res[rid] == numeric_limits<int>::max()) {
                // Ok to logspam, since it's probably a misconfiguration.
                LOG(WARNING) << "Resource " << r_name << " exists, but is "
                  << "not a worker resource. Ignoring.";
              } else {
                // Let's hope these resources have default limits of 0 :)
                ++w_res[rid];
              }
            }
          }
        }
      }

      // Apply manual worker resource overrides.
      // Try for hostport, and fallback to hostname, then shard name
      auto it = config->workerResourcesOverride.find(folly::to<string>(
          *w.machineLock_ref()->hostname_ref(),
          ':',
          *w.machineLock_ref()->port_ref()));
      if (it == config->workerResourcesOverride.end()) {
        it = config->workerResourcesOverride.find(
            *w.machineLock_ref()->hostname_ref());
      }
      if (it == config->workerResourcesOverride.end()) {
        // I added the shard-name lookup for TestBusiestSelector, but it
        // seems like a reasonable idea in general.
        it = config->workerResourcesOverride.find(*w.shard_ref());
      }

      if (it != config->workerResourcesOverride.end()) {
        for (const auto& p : it->second) {
          w_res[p.first] = p.second;
        }
      }
    }

    // Use the scheduler's view of running tasks, because that's consistent
    // with the other information we're using (while the running tasks in
    // RemoteWorkers might be more up-to-date).
    //
    // This is a moral copy of the corresponding loop in Scheduler. It will
    // disappear as soon as workers become nodes.
    auto running_tasks = taskStatuses_->copyRunningTasks();
    for (const auto& id_and_task : running_tasks) {
      const auto& rt = id_and_task.second;
      for (const auto& nr : *rt.nodeResources_ref()) {
        if (*nr.node_ref() != *rt.workerShard_ref()) {
          continue;  // These are tracked in Scheduler
        }
        auto it = workerResources_.find(*rt.workerShard_ref());
        if (it == workerResources_.end()) {
          LOG(ERROR) << error.report(
              "Resources for unknown worker ",
              *rt.workerShard_ref(),
              " from ",
              debugString(rt));
          break;  // cannot use this running task's resources
        }
        auto& resources = it->second;
        for (const auto& r : *nr.resources_ref()) {
          auto rid = config->resourceNames.lookup(*r.name_ref());
          if (rid == StringTable::NotFound || rid >= resources.size()) {
            LOG(ERROR) << error.report(
                "Resource ",
                *r.name_ref(),
                "/",
                rid,
                " not valid or known for worker ",
                *rt.workerShard_ref(),
                ": ",
                debugString(rt));
            continue;
          }
          resources[rid] -= *r.amount_ref();
          if (resources[rid] < 0) {
            LOG(ERROR) << error.report(
                "Resource ",
                *r.name_ref(),
                " is ",
                resources[rid],
                " on worker ",
                *rt.workerShard_ref(),
                " for ",
                debugString(rt));
          }
        }
      }
    }
  }}

  if (FLAGS_log_performance) {
    timer.log("Updated RemoteWorkerRunner resources");
  }
}