LogLines LogWriter::getJobLogs()

in bistro/utils/LogWriter.cpp [125:220]


LogLines LogWriter::getJobLogs(
    const string& logtype,
    const vector<string>& jobs,
    const vector<string>& nodes,
    int64_t line_id,
    bool is_ascending,
    int limit,
    const std::string& regex_filter) const {

  if (logtype != "stderr" && logtype != "stdout" && logtype != "statuses") {
    throw BistroException("Unknown table for logs: ", logtype);
  }

  // Compose the WHERE clause -- either of "jobs" or "nodes" may be empty
  vector<string> where_clauses;
  if (!jobs.empty()) {
    where_clauses.emplace_back("job_id IN (");
    for (const auto& job : jobs) {
      where_clauses.back().append("?,");
    }
    where_clauses.back().back() = ')';
  }
  if (!nodes.empty()) {
    where_clauses.emplace_back("node_id IN (");
    for (const auto& node : nodes) {
      where_clauses.back().append("?,");
    }
    where_clauses.back().back() = ')';
  }
  if (line_id != LogLine::kNotALineID) {
    if (is_ascending) {
      where_clauses.emplace_back(folly::to<string>(
        "time_and_count >= ", line_id
      ));
    } else {
      where_clauses.emplace_back(folly::to<string>(
        "time_and_count <= ", line_id
      ));
    }
  }
  string where_clause;
  if (!where_clauses.empty()) {
    where_clause = "WHERE (" + folly::join(") AND (", where_clauses) + ")";
  }

  string query = folly::to<string>(
    "SELECT job_id, node_id, time_and_count, line FROM ", logtype, " ",
    // Sorting only by time_and_count gives us a reasonable ability to view
    // logs across multiple jobs or nodes, and uses the primary index.
    where_clause, " ORDER BY time_and_count ", (is_ascending ? "ASC" : "DESC"),
    " LIMIT ", limit + 1
  );

  // Bind the WHERE clause arguments
  auto st = db_->prepare(query);
  auto bind_it = st->bindIterator();
  string debug_where_args{"'; args: '"};  // WHERE args for the logs
  for (const auto& job : jobs) {
    *bind_it++ = job;
    debug_where_args.append(job);
    debug_where_args.append("', '");
  }
  for (const auto& node : nodes) {
    *bind_it++ = node;
    debug_where_args.append(node);
    debug_where_args.append("', '");
  }

  // Run the query
  folly::AutoTimer<>(
      folly::to<std::string>("Query: '", query, debug_where_args));
  LogLines res;
  // Assuming that micro-optimizing the "" case is pointless, but did not test.
  boost::regex re(regex_filter);
  for (const auto& r : st->query()) {
    const auto& line = r.getText(3);
    if (!boost::regex_search(line, re)) {
      continue;
    }
    auto id = r.getInt64(2);
    res.lines.emplace_back(
      r.getText(0), r.getText(1), LogLine::timeFromLineID(id), line, id
    );
  }
  if (res.lines.size() <= limit) {
    // There are no more lines in this direction. This sentinel tells
    // WorkerRunner::getJobLogs that this host does not constrain the
    // aggregate nextLineID.
    res.nextLineID = LogLine::kNotALineID;
  } else {
    res.nextLineID = res.lines.back().lineID;
    res.lines.pop_back();
  }
  LOG(INFO) << "Got " << res.lines.size() << " " << logtype << " lines";
  return res;
}