in bistro/scheduler/Scheduler.cpp [120:248]
void processRunningTasks(
const Config& config,
Monitor* monitor,
const std::unordered_map<std::pair<Job::ID, Node::ID>, cpp2::RunningTask>&
running_tasks,
const std::unordered_map<int, NodeGroup>& node_groups,
std::vector<cpp2::RunningTask>* orphan_tasks,
NodeGroupToPackedResources* node_group_packed_resources) {
DEFINE_MONITOR_ERROR(monitor, error, "Running task resources");
// Build two temporary views on running_tasks (pointers, not copies).
//
// Find node "orphans": running tasks, whose nodes are deleted/disabled.
// Use 'list' for stable iterators, so orphan search can erase easily.
std::unordered_map<std::string, std::list<const cpp2::RunningTask*>>
node_to_tasks;
// Used to subtract out node resources consumed by running tasks.
std::unordered_map<std::string, std::vector<const cpp2::Resource*>>
node_used_resources;
for (const auto& id_and_task : running_tasks) {
const auto& rt = id_and_task.second;
node_to_tasks[*id_and_task.second.node_ref()].push_front(
&id_and_task.second);
for (const auto& nr : *rt.nodeResources_ref()) {
auto& resources = node_used_resources[*nr.node_ref()];
for (const auto& res : *nr.resources_ref()) {
resources.push_back(&res);
}
}
}
// Subtract out resources used by running tasks from
// node_group_packed_resources, and simultaneously detect orphans (running
// tasks whose jobs or nodes are deleted or disabled). After this pass,
// only orphans will be left in node_to_tasks.
//
// With replica nodes, a task is considered an orphan only when all the
// replicas go down. This isn't really the right behavior, but it's not
// so broken as to fix urgently. Future: model replicas as follows: runs
// on "db1_host1" -> uses "db1" and "host1" resources.
for (auto& lng : node_groups) {
PackedResources& packed_resources =
(*node_group_packed_resources)[lng.first];
for (const auto& node : lng.second.nodes_) {
// Find orphans: keep only tasks, whose jobs are deleted or disabled.
if (node->enabled()) { // For orphans, disabled is the same as deleted.
auto tasks = node_to_tasks.find(node->name());
if (tasks != node_to_tasks.end()) { // Does node have running tasks?
for (auto it = tasks->second.begin(); it != tasks->second.end();) {
auto jobs_it = config.jobs.find(*(*it)->job_ref());
if (jobs_it != config.jobs.end() && jobs_it->second->canRun()) {
it = tasks->second.erase(it); // Not a job or node orphan.
} else {
++it; // Leave orphans in the list.
}
}
}
}
// Subtract out resources used by the running tasks. We do this in a
// outside of the cross-product loop, because:
// - This way, we account even for tasks from deleted jobs
// - running_tasks.find() for every job-node combination is expensive
// Note that we don't care if the task's node is disabled -- the task
// is running, and may be using resources on non-disabled nodes.
auto resources_it = node_used_resources.find(node->name());
if (resources_it != node_used_resources.end()) {
for (const cpp2::Resource* r : resources_it->second) {
auto rsrc_it = lng.second.resourceToIndex_.find(*r->name_ref());
if (rsrc_it == lng.second.resourceToIndex_.end()) {
LOG(ERROR) << error.report(
"Resource ", *r->name_ref(), " not found in the node-group of node ",
node->name(), ": ", debugString(*r)
);
// We log, and ignore the bad resource as if it didn't exist.
// Don't try to kill such tasks, since it can be easy to
// transiently misconfigure resources such that this fires.
continue;
}
auto& amount = packed_resources.at(node->offset + rsrc_it->second);
amount -= *r->amount_ref();
if (amount < 0) {
LOG(ERROR) << error.report(
"Resource ", *r->name_ref(), " is ", amount, " on node ", node->name(),
" for ", debugString(*r)
);
}
}
// Delete the node now that we accounted for its resources. That
// way, at the end of the loop, only unknown nodes will remain in
// node_used_resources, enabling the warning loop below.
node_used_resources.erase(resources_it);
}
}
}
// Copy the orphan running tasks -- Scheduler will return these.
for (const auto& p : node_to_tasks) {
for (const auto& rt : p.second) {
orphan_tasks->emplace_back(*rt);
}
}
// HACK HACK HACK: Until workers become nodes, we cannot warn about
// unknown worker nodes. Since we don't have a list of worker node names
// handy, identify these nodes by their resource names.
std::unordered_set<std::string> worker_resource_names;
auto worker_level = config.levels.lookup("worker");
CHECK_GE(worker_level, 0);
CHECK_LT(worker_level, config.levelIDToResourceID.size());
for (auto rid : config.levelIDToResourceID[worker_level]) {
worker_resource_names.emplace(config.resourceNames.lookup(rid));
}
// Warn about nodes mentioned by running tasks, which we don't know about.
for (const auto& p : node_used_resources) {
std::string rs;
size_t num_worker_resources = 0;
for (const cpp2::Resource* r : p.second) {
num_worker_resources += worker_resource_names.count(*r->name_ref());
rs += debugString(*r) + "\n";
}
if (num_worker_resources != 0) {
if (num_worker_resources == p.second.size()) {
continue; // We don't know about workers in this loop yet.
}
rs += "[ERROR: Not all node resources were worker resources]\n"; // WTF?
}
LOG(ERROR) << error.report(
"Running task requires resources on unknown node ", p.first, ":\n", rs
);
}
}