in bistro/remote/BusiestRemoteWorkerSelector.cpp [21:116]
TaskRunnerResponse BusiestRemoteWorkerSelector::findWorker(
const Config* config,
const Job& job,
const Node& node,
int worker_level,
Monitor* monitor,
WorkerResources* worker_resources,
RemoteWorkers* workers,
cpp2::BistroWorker* found_worker,
int64_t* did_not_run_sequence_num) noexcept {
DEFINE_MONITOR_ERROR(monitor, error, "BusiestRemoteWorkerSelector");
// Optionally, respect task-locality requirements
auto hostname = job.requiredHostForTask(node);
const auto& worker_pool = hostname.empty()
? workers->workerPool() : workers->hostWorkerPool(hostname);
const auto& worker_resource_ids =
config->levelIDToResourceID.at(worker_level);
// `int` since ResourceVector stores int, but `int64_t` could make sense too.
using Weight = int;
Weight least_weight = std::numeric_limits<Weight>::max();
RemoteWorker* busiest_worker = nullptr; // This worker can fit this task.
ResourceVector* busiest_worker_rsrc = nullptr;
// If no worker is healthy, don't bother trying to run more tasks.
bool saw_healthy_worker = false;
// Compute weights for workers so that the busiest one has the lowest weight.
for (const auto& p : worker_pool) {
if (!p.second->isHealthy()) {
continue;
}
auto rv_it = worker_resources->find(p.first);
// I'm not sure, but I think this can happen if a new worker was just
// added, but updateConfig had not yet run for it.
if (rv_it == worker_resources->end()) {
LOG(WARNING) << "Not using worker " << p.first << " since it does "
<< "not yet have resources";
continue;
}
saw_healthy_worker = true;
if (!RemoteWorkerSelector::jobCanRunOnWorker(
job, p.second->getBistroWorker(), worker_level
)) {
continue;
}
// If the task fits on the worker, compute the left-over weight.
Weight remaining_weight = 0;
for (auto rid : worker_resource_ids) {
CHECK(rid >= 0 && rid < rv_it->second.size()) << rid;
CHECK(rid < config->resourceIDToWeight.size()) << rid;
CHECK(rv_it->second[rid] != std::numeric_limits<int>::max()) << rid;
CHECK(rid < job.resources().size());
if (rv_it->second[rid] < job.resources()[rid]) {
remaining_weight = -1; // Sentinel: Not enough resources to run task.
break;
}
// Can never become negative since weights are nonnegative.
remaining_weight += (rv_it->second[rid] - job.resources()[rid])
* config->resourceIDToWeight[rid];
}
if (remaining_weight >= 0 && remaining_weight < least_weight) {
least_weight = remaining_weight;
busiest_worker = p.second.get();
busiest_worker_rsrc = &rv_it->second;
}
}
if (!busiest_worker) {
// We weren't able to schedule this task. But future tasks could still
// run if they use fewer resources.
if (saw_healthy_worker) {
return DidNotRunTask;
}
LOG(WARNING) << error.report(
"No healthy workers to run ", job.name(), ", ", node.name()
);
return DoNotRunMoreTasks;
}
CHECK(busiest_worker_rsrc);
*found_worker = busiest_worker->getBistroWorker();
*did_not_run_sequence_num =
busiest_worker->getNotifyIfTasksNotRunningSequenceNum();
// Update the worker's resource vector.
for (auto rid : worker_resource_ids) {
CHECK_GE((*busiest_worker_rsrc)[rid], job.resources()[rid]);
(*busiest_worker_rsrc)[rid] -= job.resources()[rid];
}
return RanTask;
}