in bistro/scheduler/Scheduler.cpp [251:363]
Scheduler::Result Scheduler::schedule(
time_t cur_time,
const Config& config,
const std::shared_ptr<const Nodes>& nodes,
const TaskStatusSnapshot& status_snapshot,
TaskRunnerCallback cb,
const std::shared_ptr<Monitor> monitor) {
folly::AutoTimer<> timer;
CHECK(config.levelIDToResourceID.size() == config.resourcesByLevel.size());
// Here and below, level == NodeGroup. Future: rename 'level' throughout.
//
// Group nodes & resources by their NodeGroup -- this speeds up and
// simplifies the rest of the scheduling computation
std::unordered_map<int, NodeGroup> node_groups;
for (int ngid = 0; ngid < config.levelIDToResourceID.size(); ++ngid) {
node_groups.emplace(ngid, NodeGroup(config, ngid));
}
for (const auto& node : *nodes) {
node_groups.at(node->level()).nodes_.push_back(node.get());
}
if (FLAGS_log_performance) {
timer.log("Did precomputations that will be eliminated by a refactor");
}
NodeGroupToPackedResources node_group_packed_resources;
for (auto& lng : node_groups) {
// Reorder before packing, since that makes deterministic tests easier.
lng.second.reorderNodes(config.nodeOrderType);
lng.second.packResourcesInto(config, &node_group_packed_resources);
}
if (FLAGS_log_performance) {
timer.log("Prepared resource vectors");
}
// Identify orphaned tasks, and subtract running tasks' resources from
// node_group_packed_resources.
Scheduler::Result result;
processRunningTasks(
config,
monitor.get(),
status_snapshot.getRunningTasks(),
node_groups,
&result.orphanTasks_,
&node_group_packed_resources
);
if (FLAGS_log_performance) {
timer.log("Accounted for running task resources");
}
if (!result.orphanTasks_.empty()) {
timer.log("Found ", result.orphanTasks_.size(), " orphan running tasks");
}
// Compute cross product of jobs x nodes.
std::vector<JobWithNodes> job_with_nodes;
for (const auto& job_pair : shuffled(config.jobs)) {
const auto& job = job_pair.second;
if (!job->canRun()) {
continue;
}
job_with_nodes.emplace_back(config, job, &node_group_packed_resources);
auto& job_nodes = job_with_nodes.back();
const auto row = status_snapshot.getRow(job->id());
auto ng_it = node_groups.find(job->levelForTasks());
if (ng_it == node_groups.end()) {
continue;
}
for (const auto& node : ng_it->second.nodes_) {
// Exclude running, finished, failed, and backed-off tasks
// NB It's much cheaper to check isRunning() than runningTasks_.
auto s = row.getPtr(node->id());
// It's much cheaper to check isRunning() than runningTasks_.
if (s && (s->isRunning() || s->isDone() ||
s->isFailed() || s->isInBackoff(cur_time))) {
continue;
}
if (job->shouldRunOn(*node) != Job::ShouldRun::Yes) {
continue;
}
// Remove tasks whose dependents have not finished yet.
if (!std::all_of(
job->dependencies().cbegin(),
job->dependencies().cend(),
[&status_snapshot, &node](Job::ID id) {
const TaskStatus* st = status_snapshot.getPtr(id, node->id());
return st && st->isDone();
})) {
continue;
}
job_nodes.nodes.push_back(node);
}
}
if (FLAGS_log_performance) {
timer.log("Cross product computed");
}
int scheduled_tasks = getSchedulerPolicy(config.schedulerPolicyName)
->schedule(job_with_nodes, cb);
if (scheduled_tasks) {
timer.log("Scheduled ", scheduled_tasks, " tasks");
}
result.areTasksRunning_ =
!status_snapshot.getRunningTasks().empty() || (scheduled_tasks > 0);
return result;
}