Scheduler::Result Scheduler::schedule()

in bistro/scheduler/Scheduler.cpp [251:363]


Scheduler::Result Scheduler::schedule(
    time_t cur_time,
    const Config& config,
    const std::shared_ptr<const Nodes>& nodes,
    const TaskStatusSnapshot& status_snapshot,
    TaskRunnerCallback cb,
    const std::shared_ptr<Monitor> monitor) {

  folly::AutoTimer<> timer;

  CHECK(config.levelIDToResourceID.size() == config.resourcesByLevel.size());

  // Here and below, level == NodeGroup. Future: rename 'level' throughout.
  //
  // Group nodes & resources by their NodeGroup -- this speeds up and
  // simplifies the rest of the scheduling computation
  std::unordered_map<int, NodeGroup> node_groups;
  for (int ngid = 0; ngid < config.levelIDToResourceID.size(); ++ngid) {
    node_groups.emplace(ngid, NodeGroup(config, ngid));
  }
  for (const auto& node : *nodes) {
    node_groups.at(node->level()).nodes_.push_back(node.get());
  }
  if (FLAGS_log_performance) {
    timer.log("Did precomputations that will be eliminated by a refactor");
  }

  NodeGroupToPackedResources node_group_packed_resources;
  for (auto& lng : node_groups) {
    // Reorder before packing, since that makes deterministic tests easier.
    lng.second.reorderNodes(config.nodeOrderType);
    lng.second.packResourcesInto(config, &node_group_packed_resources);
  }
  if (FLAGS_log_performance) {
    timer.log("Prepared resource vectors");
  }

  // Identify orphaned tasks, and subtract running tasks' resources from
  // node_group_packed_resources.
  Scheduler::Result result;
  processRunningTasks(
    config,
    monitor.get(),
    status_snapshot.getRunningTasks(),
    node_groups,
    &result.orphanTasks_,
    &node_group_packed_resources
  );
  if (FLAGS_log_performance) {
    timer.log("Accounted for running task resources");
  }
  if (!result.orphanTasks_.empty()) {
    timer.log("Found ", result.orphanTasks_.size(), " orphan running tasks");
  }

  // Compute cross product of jobs x nodes.
  std::vector<JobWithNodes> job_with_nodes;
  for (const auto& job_pair : shuffled(config.jobs)) {
    const auto& job = job_pair.second;
    if (!job->canRun()) {
      continue;
    }

    job_with_nodes.emplace_back(config, job, &node_group_packed_resources);
    auto& job_nodes = job_with_nodes.back();

    const auto row = status_snapshot.getRow(job->id());
    auto ng_it = node_groups.find(job->levelForTasks());
    if (ng_it == node_groups.end()) {
      continue;
    }
    for (const auto& node : ng_it->second.nodes_) {
      // Exclude running, finished, failed, and backed-off tasks
      // NB It's much cheaper to check isRunning() than runningTasks_.
      auto s = row.getPtr(node->id());
      // It's much cheaper to check isRunning() than runningTasks_.
      if (s && (s->isRunning() || s->isDone() ||
                s->isFailed() || s->isInBackoff(cur_time))) {
        continue;
      }

      if (job->shouldRunOn(*node) != Job::ShouldRun::Yes) {
        continue;
      }

      // Remove tasks whose dependents have not finished yet.
      if (!std::all_of(
        job->dependencies().cbegin(),
        job->dependencies().cend(),
        [&status_snapshot, &node](Job::ID id) {
          const TaskStatus* st = status_snapshot.getPtr(id, node->id());
          return st && st->isDone();
        })) {
        continue;
      }

      job_nodes.nodes.push_back(node);
    }
  }
  if (FLAGS_log_performance) {
    timer.log("Cross product computed");
  }

  int scheduled_tasks = getSchedulerPolicy(config.schedulerPolicyName)
    ->schedule(job_with_nodes, cb);
  if (scheduled_tasks) {
    timer.log("Scheduled ", scheduled_tasks, " tasks");
  }

  result.areTasksRunning_ =
    !status_snapshot.getRunningTasks().empty() || (scheduled_tasks > 0);
  return result;
}