in bistro/processes/TaskSubprocessQueue.cpp [347:465]
void TaskSubprocessQueue::runTask(
const cpp2::RunningTask& rt,
const std::vector<std::string>& cmd,
const std::string& job_arg,
const boost::filesystem::path& working_dir,
StatusCob status_cob,
ResourceCob resource_cob,
cpp2::TaskSubprocessOptions opts) {
// Set up the task's map entry up-front, because it is a usage error to
// start two tasks with the same invocation ID. It also must be done
// before setting up the callbacks, since those expect tasks_ to be ready,
// and *may* run immediately. Also, the destructor relies on tasks_ being
// added to synchronously.
auto state = std::make_shared<detail::TaskSubprocessState>(
rt,
std::move(opts),
&tasksResourceMonitor_, // Outlives task states
std::move(resource_cob)
);
SYNCHRONIZED(tasks_) {
if (!tasks_.emplace(makeInvocationID(rt), state).second) {
throw BistroException(
"Task already running: ", apache::thrift::debugString(rt)
);
}
}
// Passing 'this' is not too scary, since the EventBase threads are the
// first to be stopped / destroyed on this object.
threadPool_.add([this, rt, cmd, job_arg, working_dir, status_cob, state]() {
std::string debug_cmd{"unknown"};
try {
// Populate `debug_cmd` first, since this shouldn't throw, and it's
// required for nice exception logging.
auto pipe_filename = // Unlike /proc, /dev/fd works on Linux and OS X
folly::to<std::string>("/dev/fd/", childStatusPipePlaceholder_.fd());
std::vector<std::string> args{*rt.node_ref(), pipe_filename, job_arg};
debug_cmd = folly::to<std::string>(
'[', escapeShellArgsInsecure(cmd), "] + [",
escapeShellArgsInsecure(args), ']'
);
boost::system::error_code ec;
if (!boost::filesystem::exists(working_dir, ec)) {
boost::filesystem::create_directories(working_dir, ec);
}
if (ec) {
throw BistroException(
"Failed to make working directory: ", ec.message());
}
CHECK(cmd.size() >= 1);
std::vector<std::string> full_cmd{cmd};
full_cmd.insert(full_cmd.end(), args.begin(), args.end());
logEvent(google::INFO, rt, state.get(), "running", folly::dynamic::object
("command", folly::dynamic(full_cmd.begin(), full_cmd.end()))
);
auto opts = folly::Subprocess::Options().pipeStdout().pipeStderr()
.chdir(working_dir.native())
.fd(childStatusPipePlaceholder_.fd(), folly::Subprocess::PIPE_OUT);
if (*state->opts().useCanaryPipe_ref()) {
// It's much easier for us to get the read end of the pipe, since we
// can just use AsyncReadPipe to track its closing.
opts.fd(childCanaryPipePlaceholder_.fd(), folly::Subprocess::PIPE_OUT);
}
if (*state->opts().parentDeathSignal_ref() != 0) {
opts.parentDeathSignal(*state->opts().parentDeathSignal_ref());
}
if (*state->opts().processGroupLeader_ref()) {
opts.processGroupLeader();
}
// folly::Subprocess() can throw, and that's ok. Do not make `proc`
// inline with waitForSubprocessAndPipes to ensure that `state` is
// safe to use in the `catch` below.
auto proc = [&state, &full_cmd](
// Pass by r-value, since opts will contain an invalid pointer after
// add_to_cgroups is destroyed below.
folly::Subprocess::Options&& options
) {
if (state->cgroupName().empty()) {
return folly::Subprocess(full_cmd, options);
}
LOG(INFO) << "Making task cgroups named " << state->cgroupName();
// This must live only until folly::Subprocess's constructor exits.
AddChildToCGroups add_to_cgroups(cgroupSetup(
state->cgroupName(), *state->opts().cgroupOptions_ref()));
options.dangerousPostForkPreExecCallback(&add_to_cgroups);
return folly::Subprocess(full_cmd, options);
}(std::move(opts));
// IMPORTANT: This call must be last in the block and `noexcept` to
// ensure that the below "remove from tasks_" cannot race the "remove
// from tasks_" in the "subprocess exited and pipes closed" callback.
waitForSubprocessAndPipes(
rt, std::move(state), std::move(proc), std::move(status_cob)
);
} catch (const std::exception& e) {
SYNCHRONIZED(tasks_) { // Remove the task -- its callback won't run.
if (tasks_.erase(makeInvocationID(rt)) == 0) {
LOG(FATAL) << "Missing task: " << apache::thrift::debugString(rt);
}
}
auto msg = folly::to<std::string>(
"Failed to start task ", apache::thrift::debugString(rt), " / ",
debug_cmd, ": ", e.what()
);
auto status = TaskStatus::errorBackoff(msg);
// Although `state` is moved above, this is ok, since our move
// constructors and waitForSubprocessAndPipes must be noexcept.
logEvent(
google::ERROR, rt, state.get(), "task_failed_to_start",
folly::dynamic::object("status", status.toDynamicNoTime())
); // noexcept
// Advances the backoff/retry counter, so we eventually give up.
status_cob(rt, std::move(status)); // noexcept
}
});
}