in bistro/statuses/TaskStatusSnapshot.cpp [36:129]
void TaskStatusSnapshot::updateForConfig(const Config& config) {
// Make a set of current jobs so we can test if jobs were deleted.
unordered_set<int> current_jobs;
for (const auto& pair : config.jobs) {
current_jobs.insert(static_cast<int>(pair.second->id()));
}
// Look for running tasks that don't have corresponding status entries in
// rows. This happens, e.g. when a job gets removed and re-added.
for (const auto& id_and_task : runningTasks_) {
const auto& rt = id_and_task.second;
const int job_id = as_const(Job::JobNameTable)->lookup(*rt.job_ref());
const int node_id = as_const(Node::NodeNameTable)->lookup(*rt.node_ref());
auto& status_ref = access(job_id, node_id);
if (!status_ref.isRunning()) {
LOG(WARNING) << "Task in runningTasks_ was not marked as running: "
<< status_ref.toJson();
}
status_ref = TaskStatus::running();
// Even if the job is deleted from the configuration, don't clear out
// its status just yet.
current_jobs.insert(job_id);
}
// To save RAM, delete status rows for jobs that are not current, and have
// no running tasks. Very deliberately does not modify runningTasks_,
// since we still have to track those tasks until they finish.
folly::AutoTimer<> timer;
int count = 0, last_deleted = StringTable::NotFound;
for (int job_id = 0; job_id < rows_.size(); ++job_id) {
if (current_jobs.count(job_id) == 0 && !rows_[job_id].statuses_.empty()) {
rows_[job_id] = StatusRow();
++count;
last_deleted = job_id;
}
}
if (count > 0) {
timer.log(
"Cleared statuses for ", count, " deleted jobs, including ",
as_const(Job::JobNameTable)->lookup(last_deleted)
);
}
// Some current jobs might not have an entry in rows_ yet.
int max_job_id = -1;
for (auto job_id : current_jobs) {
max_job_id = max(job_id, max_job_id);
}
if (max_job_id >= rows_.size()) {
rows_.resize(max_job_id + 1);
}
// For current jobs that have not yet loaded statuses, read the statuses
// from the TaskStore. TODO: Overall plan for handling TaskStore failures.
//
// Load all the jobs in one batch because the per-call overhead of
// TaskStore::fetchJobTasks can be high for remote DBs.
std::vector<std::string> job_names;
for (auto job_id : current_jobs) {
if (!rows_[job_id].isLoaded_) {
job_names.push_back(Job::JobNameTable->lookup(job_id));
}
}
taskStore_->fetchJobTasks(
job_names,
[this](
const string& job,
const string& node,
TaskStore::TaskResult r,
int64_t timestamp) {
const int job_id = as_const(Job::JobNameTable)->lookup(job);
CHECK(job_id != StringTable::NotFound) << "Job should be known: " << job;
const int node_id = Node::NodeNameTable->insert(node);
auto& status_ref = access(job_id, node_id);
if (status_ref.isRunning()) {
LOG(ERROR) << "Refusing to load status " << r << " for running task "
<< job << ", " << node;
}
if (r == TaskStore::TaskResult::DONE) {
status_ref = TaskStatus::done(timestamp);
} else if (r == TaskStore::TaskResult::FAILED) {
status_ref = TaskStatus::failed();
} else {
LOG(ERROR) << "Bad status " << r << " for " << job << ", " << node;
}
}
);
// Mark all jobs loaded
for (auto job_id : current_jobs) {
if (!rows_[job_id].isLoaded_) {
rows_[job_id].isLoaded_ = true;
}
}
}