std::chrono::milliseconds Bistro::scheduleOnce()

in bistro/Bistro.cpp [44:184]


std::chrono::milliseconds Bistro::scheduleOnce(
    std::chrono::milliseconds time_since_epoch) noexcept {
  DEFINE_MONITOR_ERROR(monitor_.get(), error, "Main loop");

  std::shared_ptr<const Config> config;
  try {
    config = configLoader_->getDataOrThrow();
    if (!config->enabled) {
      LOG(WARNING) << error.report("Not running because enabled is false");
      return config->idleWait;
    }
  } catch (const exception& e) {
    LOG(ERROR) << error.report("Error updating config: ", e.what());
    return kSleepOnError;
  }

  // DO: Get the Nodes before getting the Config so that the our config is
  // always as new as the one used to fetch the nodes, or newer.  It's not
  // desirable to hold back the config to be exactly the same version as the
  // one used to fetch the nodes, since nodes are polled much less often,
  // and it's best to be maximally responsive to config changes.
  std::shared_ptr<const Nodes> nodes;
  try {
    nodes = nodesLoader_->getDataOrThrow();
    if (nodes->size() == 0) {
      LOG(WARNING) << error.report("Not running because no nodes are defined");
      return config->idleWait;
    }
  } catch (const exception& e) {
    LOG(ERROR) << error.report("Error getting nodes: ", e.what());
    return kSleepOnError;
  }

  // Load statuses from TaskStore, free status memory used by deleted jobs.
  taskStatuses_->updateForConfig(*config);
  // Updates remote worker resources. It would be great to make remote
  // worker resources be modeled as regular resources in the scheduler, so
  // there is no need to update worker resources separately.
  taskRunner_->updateConfig(config);

  auto status_snapshot = taskStatuses_->copySnapshot();
  auto sched_result = Scheduler().schedule(
    std::chrono::duration_cast<std::chrono::seconds>(time_since_epoch).count(),
    *config,
    nodes,
    status_snapshot,
    [this, &status_snapshot, config]
    (const JobPtr& job, const Node& node) noexcept {
      // The lifetime of the inner callback is potentially very long, so
      // just copy and capture the data that it needs.
      const auto job_id = job->id();
      const auto node_id = node.id();
      return taskRunner_->runTask(
        *config,
        job,
        node,
        // The previous status, if any.
        status_snapshot.getPtr(job->id(), node.id()),
        [this, job_id, node_id](
          const cpp2::RunningTask& running_task, TaskStatus&& status
        ) noexcept {
          // TODO(#5507329): The reason this exists (as opposed to passing
          // around taskStatuses_ is that for "in-process" updateStatus
          // calls, we always have the numeric node & job IDs available, so
          // this is an unbenchmarked micro-optimization to reuse them
          // instead of looking them up.  The code would be simpler without.
          // For example, RemoteWorker could directly accept a TaskStatuses,
          // thus ensuring atomicity of updates, and avoiding the
          // complicated "RemoteWorkerUpdate" scheme.
          taskStatuses_->updateStatus(
            job_id,
            node_id,
            running_task,
            std::move(status)
          );
        }
      );
    },
    monitor_
  );

  folly::AutoTimer<> timer;
  // We'll make a new map, discarding any tasks that are no longer orphans.
  decltype(orphanTaskIDToKillTime_)  new_id_to_kill_time;
  size_t num_killed = 0;
  if (taskRunner_->canKill()) {
    // Is it time to kill any of the currently orphaned tasks?
    for (auto&& rt : sched_result.orphanTasks_) {
      std::chrono::milliseconds kill_after;
      auto jit = config->jobs.find(*rt.job_ref());
      if (jit == config->jobs.end()) {
        // Job deleted? No problem, default to the deployment-wide policy.
        if (!config->killOrphanTasksAfter.has_value()) {
          continue;
        }
        kill_after = config->killOrphanTasksAfter.value();
      } else {
        if (!jit->second->killOrphanTasksAfter().has_value()) {
          continue;
        }
        kill_after = jit->second->killOrphanTasksAfter().value();
      }
      auto id = std::make_pair(*rt.job_ref(), *rt.node_ref());
      auto it = orphanTaskIDToKillTime_.find(id);
      auto kill_time = (it == orphanTaskIDToKillTime_.end())
        ? (time_since_epoch + kill_after) : it->second;
      if (kill_time > time_since_epoch) {  // Kill later
        new_id_to_kill_time[std::move(id)] = kill_time;
      } else {  // Kill now
        // Future: Consider setting a delayed "kill time" for the
        // just-killed task to ensure that we don't try to kill stubborn
        // tasks in *every* scheduling loop.
        try {
          taskRunner_->killTask(
            rt,
            jit != config->jobs.end()  // Fall back to the global kill request
              ? jit->second->killRequest() : config->killRequest
          );
        } catch (const std::exception& ex) {
          LOG(WARNING) << "Failed to kill orphan task "
            << apache::thrift::debugString(rt) << ": " << ex.what();
        }
        ++num_killed;
      }
    }
  }  // Else: forget all orphans, since the feature was turned off
  orphanTaskIDToKillTime_ = new_id_to_kill_time;
  if (FLAGS_log_performance && !sched_result.orphanTasks_.empty()) {
    timer.log(
      "Processed ", sched_result.orphanTasks_.size(), " orphan tasks and "
      "tried to kill ", num_killed
    );
  }

  if (sched_result.areTasksRunning_) {
    LOG(INFO) << "Working wait...";
    return config->workingWait;
  }
  LOG(INFO) << "Idle wait...";
  return config->idleWait;
}