void processRunningTasks()

in bistro/scheduler/Scheduler.cpp [120:248]


void processRunningTasks(
    const Config& config,
    Monitor* monitor,
    const std::unordered_map<std::pair<Job::ID, Node::ID>, cpp2::RunningTask>&
      running_tasks,
    const std::unordered_map<int, NodeGroup>& node_groups,
    std::vector<cpp2::RunningTask>* orphan_tasks,
    NodeGroupToPackedResources* node_group_packed_resources) {
  DEFINE_MONITOR_ERROR(monitor, error, "Running task resources");

  // Build two temporary views on running_tasks (pointers, not copies).
  //
  // Find node "orphans": running tasks, whose nodes are deleted/disabled.
  // Use 'list' for stable iterators, so orphan search can erase easily.
  std::unordered_map<std::string, std::list<const cpp2::RunningTask*>>
    node_to_tasks;
  // Used to subtract out node resources consumed by running tasks.
  std::unordered_map<std::string, std::vector<const cpp2::Resource*>>
    node_used_resources;
  for (const auto& id_and_task : running_tasks) {
    const auto& rt = id_and_task.second;
    node_to_tasks[*id_and_task.second.node_ref()].push_front(
        &id_and_task.second);
    for (const auto& nr : *rt.nodeResources_ref()) {
      auto& resources = node_used_resources[*nr.node_ref()];
      for (const auto& res : *nr.resources_ref()) {
        resources.push_back(&res);
      }
    }
  }

  // Subtract out resources used by running tasks from
  // node_group_packed_resources, and simultaneously detect orphans (running
  // tasks whose jobs or nodes are deleted or disabled).  After this pass,
  // only orphans will be left in node_to_tasks.
  //
  // With replica nodes, a task is considered an orphan only when all the
  // replicas go down.  This isn't really the right behavior, but it's not
  // so broken as to fix urgently.  Future: model replicas as follows: runs
  // on "db1_host1" -> uses "db1" and "host1" resources.
  for (auto& lng : node_groups) {
    PackedResources& packed_resources =
      (*node_group_packed_resources)[lng.first];
    for (const auto& node : lng.second.nodes_) {
      // Find orphans: keep only tasks, whose jobs are deleted or disabled.
      if (node->enabled()) {  // For orphans, disabled is the same as deleted.
        auto tasks = node_to_tasks.find(node->name());
        if (tasks != node_to_tasks.end()) {  // Does node have running tasks?
          for (auto it = tasks->second.begin(); it != tasks->second.end();) {
            auto jobs_it = config.jobs.find(*(*it)->job_ref());
            if (jobs_it != config.jobs.end() && jobs_it->second->canRun()) {
              it = tasks->second.erase(it);  // Not a job or node orphan.
            } else {
              ++it;  // Leave orphans in the list.
            }
          }
        }
      }
      // Subtract out resources used by the running tasks. We do this in a
      // outside of the cross-product loop, because:
      //  - This way, we account even for tasks from deleted jobs
      //  - running_tasks.find() for every job-node combination is expensive
      // Note that we don't care if the task's node is disabled -- the task
      // is running, and may be using resources on non-disabled nodes.
      auto resources_it = node_used_resources.find(node->name());
      if (resources_it != node_used_resources.end()) {
        for (const cpp2::Resource* r : resources_it->second) {
          auto rsrc_it = lng.second.resourceToIndex_.find(*r->name_ref());
          if (rsrc_it == lng.second.resourceToIndex_.end()) {
            LOG(ERROR) << error.report(
              "Resource ", *r->name_ref(), " not found in the node-group of node ",
              node->name(), ": ", debugString(*r)
            );
            // We log, and ignore the bad resource as if it didn't exist.
            // Don't try to kill such tasks, since it can be easy to
            // transiently misconfigure resources such that this fires.
            continue;
          }
          auto& amount = packed_resources.at(node->offset + rsrc_it->second);
          amount -= *r->amount_ref();
          if (amount < 0) {
            LOG(ERROR) << error.report(
              "Resource ", *r->name_ref(), " is ", amount, " on node ", node->name(),
              " for ", debugString(*r)
            );
          }
        }
        // Delete the node now that we accounted for its resources.  That
        // way, at the end of the loop, only unknown nodes will remain in
        // node_used_resources, enabling the warning loop below.
        node_used_resources.erase(resources_it);
      }
    }
  }
  // Copy the orphan running tasks -- Scheduler will return these.
  for (const auto& p : node_to_tasks) {
    for (const auto& rt : p.second) {
      orphan_tasks->emplace_back(*rt);
    }
  }
  // HACK HACK HACK: Until workers become nodes, we cannot warn about
  // unknown worker nodes.  Since we don't have a list of worker node names
  // handy, identify these nodes by their resource names.
  std::unordered_set<std::string> worker_resource_names;
  auto worker_level = config.levels.lookup("worker");
  CHECK_GE(worker_level, 0);
  CHECK_LT(worker_level, config.levelIDToResourceID.size());
  for (auto rid : config.levelIDToResourceID[worker_level]) {
    worker_resource_names.emplace(config.resourceNames.lookup(rid));
  }
  // Warn about nodes mentioned by running tasks, which we don't know about.
  for (const auto& p : node_used_resources) {
    std::string rs;
    size_t num_worker_resources = 0;
    for (const cpp2::Resource* r : p.second) {
      num_worker_resources += worker_resource_names.count(*r->name_ref());
      rs += debugString(*r) + "\n";
    }
    if (num_worker_resources != 0) {
      if (num_worker_resources == p.second.size()) {
        continue;  // We don't know about workers in this loop yet.
      }
      rs += "[ERROR: Not all node resources were worker resources]\n";  // WTF?
    }
    LOG(ERROR) << error.report(
      "Running task requires resources on unknown node ", p.first, ":\n", rs
    );
  }
}