void TaskStatusSnapshot::updateForConfig()

in bistro/statuses/TaskStatusSnapshot.cpp [36:129]


void TaskStatusSnapshot::updateForConfig(const Config& config) {
  // Make a set of current jobs so we can test if jobs were deleted.
  unordered_set<int> current_jobs;
  for (const auto& pair : config.jobs) {
    current_jobs.insert(static_cast<int>(pair.second->id()));
  }

  // Look for running tasks that don't have corresponding status entries in
  // rows.  This happens, e.g. when a job gets removed and re-added.
  for (const auto& id_and_task : runningTasks_) {
    const auto& rt = id_and_task.second;
    const int job_id = as_const(Job::JobNameTable)->lookup(*rt.job_ref());
    const int node_id = as_const(Node::NodeNameTable)->lookup(*rt.node_ref());
    auto& status_ref = access(job_id, node_id);
    if (!status_ref.isRunning()) {
      LOG(WARNING) << "Task in runningTasks_ was not marked as running: "
        << status_ref.toJson();
    }
    status_ref = TaskStatus::running();
    // Even if the job is deleted from the configuration, don't clear out
    // its status just yet.
    current_jobs.insert(job_id);
  }

  // To save RAM, delete status rows for jobs that are not current, and have
  // no running tasks.  Very deliberately does not modify runningTasks_,
  // since we still have to track those tasks until they finish.
  folly::AutoTimer<> timer;
  int count = 0, last_deleted = StringTable::NotFound;
  for (int job_id = 0; job_id < rows_.size(); ++job_id) {
    if (current_jobs.count(job_id) == 0 && !rows_[job_id].statuses_.empty()) {
      rows_[job_id] = StatusRow();
      ++count;
      last_deleted = job_id;
    }
  }
  if (count > 0) {
    timer.log(
      "Cleared statuses for ", count, " deleted jobs, including ",
      as_const(Job::JobNameTable)->lookup(last_deleted)
    );
  }

  // Some current jobs might not have an entry in rows_ yet.
  int max_job_id = -1;
  for (auto job_id : current_jobs) {
    max_job_id = max(job_id, max_job_id);
  }
  if (max_job_id >= rows_.size()) {
    rows_.resize(max_job_id + 1);
  }

  // For current jobs that have not yet loaded statuses, read the statuses
  // from the TaskStore.  TODO: Overall plan for handling TaskStore failures.
  //
  // Load all the jobs in one batch because the per-call overhead of
  // TaskStore::fetchJobTasks can be high for remote DBs.
  std::vector<std::string> job_names;
  for (auto job_id : current_jobs) {
    if (!rows_[job_id].isLoaded_) {
      job_names.push_back(Job::JobNameTable->lookup(job_id));
    }
  }
  taskStore_->fetchJobTasks(
    job_names,
    [this](
        const string& job,
        const string& node,
        TaskStore::TaskResult r,
        int64_t timestamp) {
      const int job_id = as_const(Job::JobNameTable)->lookup(job);
      CHECK(job_id != StringTable::NotFound) << "Job should be known: " << job;
      const int node_id = Node::NodeNameTable->insert(node);
      auto& status_ref = access(job_id, node_id);
      if (status_ref.isRunning()) {
        LOG(ERROR) << "Refusing to load status " << r << " for running task "
          << job << ", " << node;
      }
      if (r == TaskStore::TaskResult::DONE) {
        status_ref = TaskStatus::done(timestamp);
      } else if (r == TaskStore::TaskResult::FAILED) {
        status_ref = TaskStatus::failed();
      } else {
        LOG(ERROR) << "Bad status " << r << " for " << job << ", " << node;
      }
    }
  );
  // Mark all jobs loaded
  for (auto job_id : current_jobs) {
    if (!rows_[job_id].isLoaded_) {
      rows_[job_id].isLoaded_ = true;
    }
  }
}