in bistro/runners/RemoteWorkerRunner.cpp [316:435]
LogLines getJobLogsThreadAndEventBaseSafe(
const std::string& unqueried_workers,
const std::vector<cpp2::ServiceAddress>& services,
const string& logtype,
const vector<string>& jobs,
const vector<string>& nodes,
int64_t line_id,
bool is_ascending,
const string& regex_filter,
RemoteWorkerRunner::WorkerClientFn workerClientFn) {
LogLines res;
res.nextLineID = LogLine::kNotALineID;
time_t cur_time = time(nullptr); // timestamp for any errors
if (!unqueried_workers.empty()) {
auto err = folly::to<string>(
"Warning: some workers are unhealthy, so we cannot return logs from "
"them. If your task ever ran on any of the following workers, these "
"results may be incomplete -- ", unqueried_workers
);
LOG(WARNING) << err;
res.lines.emplace_back("", "", cur_time, err, LogLine::kNotALineID);
}
// Manages a bunch of concurrent requests to get the logs from the workers.
// DANGER: Do not replace by getEventBase(), see the docstring.
folly::EventBase eb;
auto resultsx = folly::collectAllUnsafe(
folly::gen::from(services)
| folly::gen::mapped([&](auto const& addr) {
return folly::makeFutureWith([&]{
auto client = workerClientFn(&eb, addr);
return client->future_getJobLogsByID(
logtype,
jobs,
nodes,
line_id,
is_ascending,
// at least 100 lines per worker, but try to stay under 5000
// lines total
max(5000 / int(services.size()), 100),
regex_filter);
})
.thenError([&](folly::exception_wrapper&& ew) {
auto const service_id = debugString(addr);
// Logging is done here so that errors are emitted as they happen,
// rather than all at once at the end.
auto const err = folly::to<string>(
"Failed to fetch logs from worker ", service_id, ", this may be ",
"transient -- but if it is not, report it: ", ew.what()
);
LOG(ERROR) << err;
res.lines.emplace_back("", "", cur_time, err, LogLine::kNotALineID);
return folly::makeFuture<cpp2::LogLines>(std::move(ew));
});
})
| folly::gen::as<std::vector>())
.getVia(&eb);
auto const results = folly::gen::from(resultsx)
| folly::gen::filter([](auto const& _) { return _.hasValue(); })
| folly::gen::mapped([](auto& _) { return _.value(); })
| folly::gen::move
| folly::gen::as<std::vector>();
// Find the most restrictive nextLineID among all hosts
for (const auto& log : results) {
if (
// This value marks "no more lines" in LogWriter, so exclude it.
(*log.nextLineID_ref() != LogLine::kNotALineID) &&
(
// This line only matches initially due to the previous comparison
(res.nextLineID == LogLine::kNotALineID) ||
((res.nextLineID > *log.nextLineID_ref()) == is_ascending))) {
res.nextLineID = *log.nextLineID_ref();
}
}
// Merge log lines, dropping those beyond the selected nextLineID.
//
// This means that the client is guaranteed log lines that are truly
// sequential in line_id, making for an API that's easy to use
// correctly. In principle, this filtering could be done on the
// client-side, and the non-sequential lines could be displayed
// separately, but this seems too confusing for casual users.
for (const auto& log : results) {
for (const cpp2::LogLine& l : *log.lines_ref()) {
if (
// If we are on the last page of results on all hosts, drop nothing.
(res.nextLineID == LogLine::kNotALineID) ||
((*l.lineID_ref() < res.nextLineID) == is_ascending)) {
res.lines.emplace_back(
*l.jobID_ref(),
*l.nodeID_ref(),
*l.time_ref(),
*l.line_ref(),
*l.lineID_ref());
}
}
}
// Sort the merged results from all the workers.
if (is_ascending) {
sort(
res.lines.begin(),
res.lines.end(),
[](const LogLine& a, const LogLine& b) { return a.lineID < b.lineID; }
);
} else {
sort(
res.lines.begin(),
res.lines.end(),
[](const LogLine& a, const LogLine& b) { return a.lineID > b.lineID; }
);
}
return res;
}