in bistro/processes/TaskSubprocessQueue.cpp [132:317]
void TaskSubprocessQueue::waitForSubprocessAndPipes(
const cpp2::RunningTask& rt,
// Mutated: we add a per-task log line rate-limiter
std::shared_ptr<detail::TaskSubprocessState> state,
folly::Subprocess&& proc,
TaskSubprocessQueue::StatusCob status_cob) noexcept {
// Below, the `noexcept` guarantees that we don't delete from tasks_ twice
// The pipe & subprocess handlers must run in the same EventBase thread,
// since they mutate the same state. Also, the rate-limiter
// initialization below depends on `evb` running the present function.
auto evb = folly::EventBaseManager::get()->getExistingEventBase();
CHECK(evb); // The thread pool should have made the EventBase.
// Take ownership of proc's pipes, and set up their log processing on the
// EventBase. Produce 'pipe closed' futures.
//
// Technically, this block could be outside of the `noexcept`, but it
// should not throw anyhow, so leave it in the same function.
std::vector<folly::Future<folly::Unit>> pipe_closed_futures;
std::vector<std::shared_ptr<AsyncReadPipe>> pipes; // For the rate-limiter
for (auto&& p : proc.takeOwnershipOfPipes()) {
int child_fd = p.childFd;
auto job = *rt.job_ref();
auto node = *rt.node_ref();
pipes.emplace_back(asyncReadPipe(
evb,
std::move(p.pipe),
readPipeLinesCallback([
this, job, node, child_fd, state
](AsyncReadPipe*, folly::StringPiece s) {
if (s.empty()) {
// Skip the empty line at the end of \n-terminated files
return true;
}
// Enforce log line rate limits per task, rather than per job, since
// it's cleaner. For hacky per-job limits, treat log throughput as
// a statically-allocated job resource.
auto* rate_lim = state->pipeRateLimiter_.get();
// This cannot fire, since these AsyncReadPipes are being created
// from the same EventBase that will run them, and therefore
// state->pipeRateLimiter_ will be set first, below.
CHECK(rate_lim);
if (child_fd == STDERR_FILENO) {
logWriter_->write(LogTable::STDERR, job, node, s);
rate_lim->reduceQuotaBy(1);
} else if (child_fd == STDOUT_FILENO) {
logWriter_->write(LogTable::STDOUT, job, node, s);
rate_lim->reduceQuotaBy(1);
} else if (child_fd == childStatusPipePlaceholder_.fd()) {
// Discard delimiter since status parsing assumes it's gone.
s.removeSuffix("\n");
// Only use the last status line, no disk IO -- no line quota
state->rawStatus_.assign(s.data(), s.size());
}
// Else: we don't care about other FDs -- in fact, the only other FD
// it could reasonably be is the canary pipe, and we won't indulge
// the children by paying attention to the junk they write there.
return true; // Keep reading
},
65000) // Limit line length to protect the log DB
));
pipe_closed_futures.emplace_back(pipes.back()->pipeClosed());
}
// Gets to run before any line callbacks, since the current scope was
// running on the callbacks' EventBase before the callbacks even existed.
state->pipeRateLimiter_.reset(new AsyncReadPipeRateLimiter(
evb,
pollMs(state->opts()), // Poll at the Subprocess's rate
*state->opts().maxLogLinesPerPollInterval_ref(),
std::move(pipes)));
// Add callbacks to wait for the subprocess and pipes on the same EventBase.
collectAll(
// 1) The moral equivalent of waitpid(), followed by wait for cgroup.
asyncSubprocess( // Never yields an exception
evb,
std::move(proc),
// Unlike std::bind, explicitly keeps the state alive.
[rt, state](folly::Subprocess& p) {
state->asyncSubprocessCallback(rt, p);
},
pollMs(state->opts()))
.thenValue([ this, rt, state, evb ](
folly::ProcessReturnCode && rc) noexcept {
logEvent(
google::INFO,
rt,
state.get(),
"process_exited",
folly::dynamic::object("message", rc.str())); // noexcept
// Now that the child has exited, optionally use cgroups to kill,
// and/or wait for, its left-over descendant processes.
if (state->opts().cgroupOptions_ref()->subsystems_ref()->empty()) {
// No cgroups exist: shortcut early to avoid logging
// "cgroups_reaped".
return folly::Future<folly::Unit>();
}
// Usually, well-behaved descendants will have exited by this point.
// This reaper will write logspam, and wait until all processes in
// each of the task's cgroups have exited. If the `freezer`
// subsystem is available, or if `killWithoutFreezer` is set, the
// reaper will also repeatedly send them SIGKILL.
//
// This future fires only when the task cgroups lose all their
// tasks.
return asyncCGroupReaper(
evb,
*state->opts().cgroupOptions_ref(),
state->cgroupName(),
pollMs(state->opts()))
.thenTry(
[ this, rt, state ](folly::Try<folly::Unit> t) noexcept {
t.throwUnlessValue(); // The reaper never thows, crash if it
// does.
logEvent(google::INFO, rt, state.get(), "cgroups_reaped");
});
}),
// 2) Wait for the child to close all pipes
collectAll(pipe_closed_futures)
.toUnsafeFuture()
.thenValue([ this, rt, state ](
std::vector<folly::Try<folly::Unit>> &&
all_closed) noexcept { // Logs and swallows all exceptions
for (auto& try_pipe_closed : all_closed) {
try {
// DO: Use folly::exception_wrapper once wangle supports it.
try_pipe_closed.throwUnlessValue();
} catch (const std::exception& e) {
// Carry on, the pipe is known to be closed. Logging is
// noexcept.
logEvent(
google::ERROR,
rt,
state.get(),
"task_pipe_error",
folly::dynamic::object("message", e.what())); // noexcept
}
}
logEvent(
google::INFO, rt, state.get(), "task_pipes_closed"); // noexcept
}))
.toUnsafeFuture()
.thenValue([ this, rt, status_cob, state ](
// Safe to ignore exceptions, since the above callbacks are noexcept.
std::tuple<folly::Try<folly::Unit>, folly::Try<folly::Unit>> &&
// `noexcept` since this is the final handler -- nothing inspects
// its result.
) noexcept {
// Parse the task's status string, if it produced one. Note that in
// the absence of a status, our behavior is actually racy. The task
// might have spontaneously exited just before Bistro tried to kill
// it. In that case, wasKilled() would be true, and we would
// "incorrectly" report incompleteBackoff(). This is an acceptable
// tradeoff, since we *need* both sides of the wasKilled() branch:
// - We must decrement the retry counter on spontaneous exits
// with no status (e.g. C++ program segfaults).
// - We must *not* decrement the retry counter for killed tasks that
// do not output a status (i.e. no SIGTERM handler), since
// preemption should be maximally transparent.
auto status = parseStatus(state.get());
// Log and report the status
logEvent(
google::INFO,
rt,
state.get(),
"got_status",
// The NoTime variety omits "backoff_duration", but we lack it
// anyway, since only TaskStatusSnapshot calls TaskStatus::update().
folly::dynamic::object("status", status.toDynamicNoTime()));
status_cob(rt, std::move(status)); // noexcept
// logEvent ignores healthcheck messages; print a short note instead.
if (*rt.job_ref() == kHealthcheckTaskJob) {
LOG(INFO) << "Healthcheck started at "
<< *rt.invocationID_ref()->startTime_ref()
<< " quit with status '" << status.toJson() << "'";
}
SYNCHRONIZED(
tasks_) { // Remove the completed task's state from the map.
if (tasks_.erase(makeInvocationID(rt)) == 0) {
LOG(FATAL) << "Missing task: " << apache::thrift::debugString(rt);
}
}
});
}