LogLines getJobLogsThreadAndEventBaseSafe()

in bistro/runners/RemoteWorkerRunner.cpp [316:435]


LogLines getJobLogsThreadAndEventBaseSafe(
    const std::string& unqueried_workers,
    const std::vector<cpp2::ServiceAddress>& services,
    const string& logtype,
    const vector<string>& jobs,
    const vector<string>& nodes,
    int64_t line_id,
    bool is_ascending,
    const string& regex_filter,
    RemoteWorkerRunner::WorkerClientFn workerClientFn) {

  LogLines res;
  res.nextLineID = LogLine::kNotALineID;
  time_t cur_time = time(nullptr);  // timestamp for any errors

  if (!unqueried_workers.empty()) {
    auto err = folly::to<string>(
      "Warning: some workers are unhealthy, so we cannot return logs from "
      "them. If your task ever ran on any of the following workers, these "
      "results may be incomplete -- ", unqueried_workers
    );
    LOG(WARNING) << err;
    res.lines.emplace_back("", "", cur_time, err, LogLine::kNotALineID);
  }

  // Manages a bunch of concurrent requests to get the logs from the workers.
  // DANGER: Do not replace by getEventBase(), see the docstring.
  folly::EventBase eb;

  auto resultsx = folly::collectAllUnsafe(
      folly::gen::from(services)
      | folly::gen::mapped([&](auto const& addr) {
        return folly::makeFutureWith([&]{
          auto client = workerClientFn(&eb, addr);
          return client->future_getJobLogsByID(
              logtype,
              jobs,
              nodes,
              line_id,
              is_ascending,
              // at least 100 lines per worker, but try to stay under 5000
              // lines total
              max(5000 / int(services.size()), 100),
              regex_filter);
        })
        .thenError([&](folly::exception_wrapper&& ew) {
          auto const service_id = debugString(addr);
          // Logging is done here so that errors are emitted as they happen,
          // rather than all at once at the end.
          auto const err = folly::to<string>(
            "Failed to fetch logs from worker ", service_id, ", this may be ",
            "transient -- but if it is not, report it: ", ew.what()
          );
          LOG(ERROR) << err;
          res.lines.emplace_back("", "", cur_time, err, LogLine::kNotALineID);
          return folly::makeFuture<cpp2::LogLines>(std::move(ew));
        });
      })
      | folly::gen::as<std::vector>())
    .getVia(&eb);

  auto const results = folly::gen::from(resultsx)
    | folly::gen::filter([](auto const& _) { return _.hasValue(); })
    | folly::gen::mapped([](auto& _) { return _.value(); })
    | folly::gen::move
    | folly::gen::as<std::vector>();

  // Find the most restrictive nextLineID among all hosts
  for (const auto& log : results) {
    if (
        // This value marks "no more lines" in LogWriter, so exclude it.
        (*log.nextLineID_ref() != LogLine::kNotALineID) &&
        (
            // This line only matches initially due to the previous comparison
            (res.nextLineID == LogLine::kNotALineID) ||
            ((res.nextLineID > *log.nextLineID_ref()) == is_ascending))) {
      res.nextLineID = *log.nextLineID_ref();
    }
  }

  // Merge log lines, dropping those beyond the selected nextLineID.
  //
  // This means that the client is guaranteed log lines that are truly
  // sequential in line_id, making for an API that's easy to use
  // correctly.  In principle, this filtering could be done on the
  // client-side, and the non-sequential lines could be displayed
  // separately, but this seems too confusing for casual users.
  for (const auto& log : results) {
    for (const cpp2::LogLine& l : *log.lines_ref()) {
      if (
          // If we are on the last page of results on all hosts, drop nothing.
          (res.nextLineID == LogLine::kNotALineID) ||
          ((*l.lineID_ref() < res.nextLineID) == is_ascending)) {
        res.lines.emplace_back(
            *l.jobID_ref(),
            *l.nodeID_ref(),
            *l.time_ref(),
            *l.line_ref(),
            *l.lineID_ref());
      }
    }
  }

  // Sort the merged results from all the workers.
  if (is_ascending) {
    sort(
      res.lines.begin(),
      res.lines.end(),
      [](const LogLine& a, const LogLine& b) { return a.lineID < b.lineID; }
    );
  } else {
    sort(
      res.lines.begin(),
      res.lines.end(),
      [](const LogLine& a, const LogLine& b) { return a.lineID > b.lineID; }
    );
  }

  return res;
}