in bistro/runners/RemoteWorkerRunner.cpp [154:304]
void RemoteWorkerRunner::updateConfig(std::shared_ptr<const Config> config) {
folly::AutoTimer<> timer;
DEFINE_MONITOR_ERROR(monitor_, error, "RemoteWorkerRunner resource update");
// Memoize these two values to be used by the next runTask
workerLevel_ = config->levels.lookup("worker");
CHECK(workerLevel_ != config->levels.NotFound);
SYNCHRONIZED(config_) {
config_ = config;
}
const auto& def_resources = config->resourcesByLevel[workerLevel_];
// Always lock workerResources_ first, then workers_
SYNCHRONIZED(workerResources_) { SYNCHRONIZED_CONST(workers_) {
workerResources_.clear();
// While it __looks__ like we no longer need 'workers' to be locked
// after this loop, that lock is actually simultaneously ensuring that
// RemoteWorkerRunner::runTaskImpl() gets to mark the task running in
// the scheduler *BEFORE* the next updateWorkerResources(). So, there
// would be no savings by releasing the lock before we are done here.
for (const auto& wconn : workers_.workerPool()) {
const auto& w = wconn.second->getBistroWorker();
auto& w_res = workerResources_[*w.shard_ref()];
w_res = def_resources; // Start with the defaults.
// Apply any known physical resources.
for (const auto& prc : config->physicalResourceConfigs) {
// Apply CPU, RAM, # of GPU cards.
if (const auto val = [&]() -> folly::Optional<int> {
switch (*prc.physical_ref()) {
case cpp2::PhysicalResource::RAM_MBYTES:
return physicalToLogicalResource(
prc, *w.usableResources_ref()->memoryMB_ref());
case cpp2::PhysicalResource::CPU_CORES:
return physicalToLogicalResource(
prc, *w.usableResources_ref()->cpuCores_ref());
case cpp2::PhysicalResource::GPU_CARDS:
return physicalToLogicalResource(
prc, w.usableResources_ref()->gpus_ref()->size());
default:
return folly::none;
}
}()) {
CHECK_GE(*prc.logicalResourceID_ref(), 0);
CHECK_LT(*prc.logicalResourceID_ref(), w_res.size());
w_res[*prc.logicalResourceID_ref()] = *val;
}
// If the user configured special resources for GPU card models,
// populate them too.
if (*prc.physical_ref() == cpp2::PhysicalResource::GPU_CARDS) {
for (const auto& gpu : *w.usableResources_ref()->gpus_ref()) {
auto r_name = "GPU: " + *gpu.name_ref();
auto rid = config->resourceNames.lookup(r_name);
if (rid != StringTable::NotFound) {
CHECK_GE(rid, 0);
if (rid >= w_res.size()
|| w_res[rid] == numeric_limits<int>::max()) {
// Ok to logspam, since it's probably a misconfiguration.
LOG(WARNING) << "Resource " << r_name << " exists, but is "
<< "not a worker resource. Ignoring.";
} else {
// Let's hope these resources have default limits of 0 :)
++w_res[rid];
}
}
}
}
}
// Apply manual worker resource overrides.
// Try for hostport, and fallback to hostname, then shard name
auto it = config->workerResourcesOverride.find(folly::to<string>(
*w.machineLock_ref()->hostname_ref(),
':',
*w.machineLock_ref()->port_ref()));
if (it == config->workerResourcesOverride.end()) {
it = config->workerResourcesOverride.find(
*w.machineLock_ref()->hostname_ref());
}
if (it == config->workerResourcesOverride.end()) {
// I added the shard-name lookup for TestBusiestSelector, but it
// seems like a reasonable idea in general.
it = config->workerResourcesOverride.find(*w.shard_ref());
}
if (it != config->workerResourcesOverride.end()) {
for (const auto& p : it->second) {
w_res[p.first] = p.second;
}
}
}
// Use the scheduler's view of running tasks, because that's consistent
// with the other information we're using (while the running tasks in
// RemoteWorkers might be more up-to-date).
//
// This is a moral copy of the corresponding loop in Scheduler. It will
// disappear as soon as workers become nodes.
auto running_tasks = taskStatuses_->copyRunningTasks();
for (const auto& id_and_task : running_tasks) {
const auto& rt = id_and_task.second;
for (const auto& nr : *rt.nodeResources_ref()) {
if (*nr.node_ref() != *rt.workerShard_ref()) {
continue; // These are tracked in Scheduler
}
auto it = workerResources_.find(*rt.workerShard_ref());
if (it == workerResources_.end()) {
LOG(ERROR) << error.report(
"Resources for unknown worker ",
*rt.workerShard_ref(),
" from ",
debugString(rt));
break; // cannot use this running task's resources
}
auto& resources = it->second;
for (const auto& r : *nr.resources_ref()) {
auto rid = config->resourceNames.lookup(*r.name_ref());
if (rid == StringTable::NotFound || rid >= resources.size()) {
LOG(ERROR) << error.report(
"Resource ",
*r.name_ref(),
"/",
rid,
" not valid or known for worker ",
*rt.workerShard_ref(),
": ",
debugString(rt));
continue;
}
resources[rid] -= *r.amount_ref();
if (resources[rid] < 0) {
LOG(ERROR) << error.report(
"Resource ",
*r.name_ref(),
" is ",
resources[rid],
" on worker ",
*rt.workerShard_ref(),
" for ",
debugString(rt));
}
}
}
}
}}
if (FLAGS_log_performance) {
timer.log("Updated RemoteWorkerRunner resources");
}
}