in folly/Subprocess.cpp [581:1106]
int Subprocess::prepareChild(
const Options& options,
const sigset_t* sigmask,
const char* childDir) const {
// While all signals are blocked, we must reset their
// dispositions to default.
for (int sig = 1; sig < NSIG; ++sig) {
::signal(sig, SIG_DFL);
}
{
// Unblock signals; restore signal mask.
int r = pthread_sigmask(SIG_SETMASK, sigmask, nullptr);
if (r != 0) {
return r; // pthread_sigmask() returns an errno value
}
}
// Change the working directory, if one is given
if (childDir) {
if (::chdir(childDir) == -1) {
return errno;
}
}
#ifdef __linux__
// Best effort
if (options.cpuSet_.hasValue()) {
const auto& cpuSet = options.cpuSet_.value();
::sched_setaffinity(0, sizeof(cpuSet), &cpuSet);
}
#endif
// We don't have to explicitly close the parent's end of all pipes,
// as they all have the FD_CLOEXEC flag set and will be closed at
// exec time.
// Redirect requested FDs to /dev/null or NUL
// dup2 any explicitly specified FDs
for (auto& p : options.fdActions_) {
if (p.second == DEV_NULL) {
// folly/portability/Fcntl provides an impl of open that will
// map this to NUL on Windows.
auto devNull = ::open("/dev/null", O_RDWR | O_CLOEXEC);
if (devNull == -1) {
return errno;
}
// note: dup2 will not set CLOEXEC on the destination
if (::dup2(devNull, p.first) == -1) {
// explicit close on error to avoid leaking fds
::close(devNull);
return errno;
}
::close(devNull);
} else if (p.second != p.first) {
if (::dup2(p.second, p.first) == -1) {
return errno;
}
}
}
if (options.closeOtherFds_) {
closeInheritedFds(options.fdActions_);
}
#if defined(__linux__)
// Opt to receive signal on parent death, if requested
if (options.parentDeathSignal_ != 0) {
const auto parentDeathSignal =
static_cast<unsigned long>(options.parentDeathSignal_);
if (prctl(PR_SET_PDEATHSIG, parentDeathSignal, 0, 0, 0) == -1) {
return errno;
}
}
#endif
if (options.processGroupLeader_) {
#if !defined(__FreeBSD__)
if (setpgrp() == -1) {
#else
if (setpgrp(getpid(), getpgrp()) == -1) {
#endif
return errno;
}
}
// The user callback comes last, so that the child is otherwise all set up.
if (options.dangerousPostForkPreExecCallback_) {
if (int error = (*options.dangerousPostForkPreExecCallback_)()) {
return error;
}
}
return 0;
}
int Subprocess::runChild(
const char* executable,
char** argv,
char** env,
const Options& options) const {
// Now, finally, exec.
if (options.usePath_) {
::execvp(executable, argv);
} else {
::execve(executable, argv, env);
}
return errno;
}
void Subprocess::readChildErrorPipe(int pfd, const char* executable) {
ChildErrorInfo info;
auto rc = readNoInt(pfd, &info, sizeof(info));
if (rc == 0) {
// No data means the child executed successfully, and the pipe
// was closed due to the close-on-exec flag being set.
return;
} else if (rc != sizeof(ChildErrorInfo)) {
// An error occurred trying to read from the pipe, or we got a partial read.
// Neither of these cases should really occur in practice.
//
// We can't get any error data from the child in this case, and we don't
// know if it is successfully running or not. All we can do is to return
// normally, as if the child executed successfully. If something bad
// happened the caller should at least get a non-normal exit status from
// the child.
XLOGF(
ERR,
"unexpected error trying to read from child error pipe rc={}, errno={}",
rc,
errno);
return;
}
// We got error data from the child. The child should exit immediately in
// this case, so wait on it to clean up.
wait();
// Throw to signal the error
throw SubprocessSpawnError(executable, info.errCode, info.errnoValue);
}
ProcessReturnCode Subprocess::poll(struct rusage* ru) {
returnCode_.enforce(ProcessReturnCode::RUNNING);
DCHECK_GT(pid_, 0);
int status;
pid_t found = ::wait4(pid_, &status, WNOHANG, ru);
// The spec guarantees that EINTR does not occur with WNOHANG, so the only
// two remaining errors are ECHILD (other code reaped the child?), or
// EINVAL (cosmic rays?), both of which merit an abort:
PCHECK(found != -1) << "waitpid(" << pid_ << ", &status, WNOHANG)";
if (found != 0) {
// Though the child process had quit, this call does not close the pipes
// since its descendants may still be using them.
returnCode_ = ProcessReturnCode::make(status);
pid_ = -1;
}
return returnCode_;
}
bool Subprocess::pollChecked() {
if (poll().state() == ProcessReturnCode::RUNNING) {
return false;
}
checkStatus(returnCode_);
return true;
}
ProcessReturnCode Subprocess::wait() {
returnCode_.enforce(ProcessReturnCode::RUNNING);
DCHECK_GT(pid_, 0);
int status;
pid_t found;
do {
found = ::waitpid(pid_, &status, 0);
} while (found == -1 && errno == EINTR);
// The only two remaining errors are ECHILD (other code reaped the
// child?), or EINVAL (cosmic rays?), and both merit an abort:
PCHECK(found != -1) << "waitpid(" << pid_ << ", &status, 0)";
// Though the child process had quit, this call does not close the pipes
// since its descendants may still be using them.
DCHECK_EQ(found, pid_);
returnCode_ = ProcessReturnCode::make(status);
pid_ = -1;
return returnCode_;
}
void Subprocess::waitChecked() {
wait();
checkStatus(returnCode_);
}
ProcessReturnCode Subprocess::waitTimeout(TimeoutDuration timeout) {
returnCode_.enforce(ProcessReturnCode::RUNNING);
DCHECK_GT(pid_, 0) << "The subprocess has been waited already";
auto pollUntil = std::chrono::steady_clock::now() + timeout;
auto sleepDuration = std::chrono::milliseconds{2};
constexpr auto maximumSleepDuration = std::chrono::milliseconds{100};
for (;;) {
// Always call waitpid once after the full timeout has elapsed.
auto now = std::chrono::steady_clock::now();
int status;
pid_t found;
do {
found = ::waitpid(pid_, &status, WNOHANG);
} while (found == -1 && errno == EINTR);
PCHECK(found != -1) << "waitpid(" << pid_ << ", &status, WNOHANG)";
if (found) {
// Just on the safe side, make sure it's the actual pid we are waiting.
DCHECK_EQ(found, pid_);
returnCode_ = ProcessReturnCode::make(status);
// Change pid_ to -1 to detect programming error like calling
// this method multiple times.
pid_ = -1;
return returnCode_;
}
if (now > pollUntil) {
// Timed out: still running().
return returnCode_;
}
// The subprocess is still running, sleep for increasing periods of time.
std::this_thread::sleep_for(sleepDuration);
sleepDuration =
std::min(maximumSleepDuration, sleepDuration + sleepDuration);
}
}
void Subprocess::sendSignal(int signal) {
returnCode_.enforce(ProcessReturnCode::RUNNING);
int r = ::kill(pid_, signal);
checkUnixError(r, "kill");
}
ProcessReturnCode Subprocess::waitOrTerminateOrKill(
TimeoutDuration waitTimeout, TimeoutDuration sigtermTimeout) {
returnCode_.enforce(ProcessReturnCode::RUNNING);
DCHECK_GT(pid_, 0) << "The subprocess has been waited already";
this->waitTimeout(waitTimeout);
if (returnCode_.running()) {
return terminateOrKill(sigtermTimeout);
}
return returnCode_;
}
ProcessReturnCode Subprocess::terminateOrKill(TimeoutDuration sigtermTimeout) {
returnCode_.enforce(ProcessReturnCode::RUNNING);
DCHECK_GT(pid_, 0) << "The subprocess has been waited already";
// 1. Send SIGTERM to kill the process
terminate();
// 2. check whether subprocess has terminated using non-blocking waitpid
waitTimeout(sigtermTimeout);
if (!returnCode_.running()) {
return returnCode_;
}
// 3. If we are at this point, we have waited enough time after
// sending SIGTERM, we have to use nuclear option SIGKILL to kill
// the subprocess.
XLOGF(INFO, "Send SIGKILL to {}", pid_);
kill();
// 4. SIGKILL should kill the process otherwise there must be
// something seriously wrong, just use blocking wait to wait for the
// subprocess to finish.
return wait();
}
pid_t Subprocess::pid() const {
return pid_;
}
namespace {
ByteRange queueFront(const IOBufQueue& queue) {
auto* p = queue.front();
if (!p) {
return ByteRange{};
}
return io::Cursor(p).peekBytes();
}
// fd write
bool handleWrite(int fd, IOBufQueue& queue) {
for (;;) {
auto b = queueFront(queue);
if (b.empty()) {
return true; // EOF
}
ssize_t n = writeNoInt(fd, b.data(), b.size());
if (n == -1 && errno == EAGAIN) {
return false;
}
checkUnixError(n, "write");
queue.trimStart(n);
}
}
// fd read
bool handleRead(int fd, IOBufQueue& queue) {
for (;;) {
auto p = queue.preallocate(100, 65000);
ssize_t n = readNoInt(fd, p.first, p.second);
if (n == -1 && errno == EAGAIN) {
return false;
}
checkUnixError(n, "read");
if (n == 0) {
return true;
}
queue.postallocate(n);
}
}
bool discardRead(int fd) {
static const size_t bufSize = 65000;
// Thread unsafe, but it doesn't matter.
static std::unique_ptr<char[]> buf(new char[bufSize]);
for (;;) {
ssize_t n = readNoInt(fd, buf.get(), bufSize);
if (n == -1 && errno == EAGAIN) {
return false;
}
checkUnixError(n, "read");
if (n == 0) {
return true;
}
}
}
} // namespace
std::pair<std::string, std::string> Subprocess::communicate(StringPiece input) {
IOBufQueue inputQueue;
inputQueue.wrapBuffer(input.data(), input.size());
auto outQueues = communicateIOBuf(std::move(inputQueue));
auto outBufs =
std::make_pair(outQueues.first.move(), outQueues.second.move());
std::pair<std::string, std::string> out;
if (outBufs.first) {
outBufs.first->coalesce();
out.first.assign(
reinterpret_cast<const char*>(outBufs.first->data()),
outBufs.first->length());
}
if (outBufs.second) {
outBufs.second->coalesce();
out.second.assign(
reinterpret_cast<const char*>(outBufs.second->data()),
outBufs.second->length());
}
return out;
}
std::pair<IOBufQueue, IOBufQueue> Subprocess::communicateIOBuf(
IOBufQueue input) {
// If the user supplied a non-empty input buffer, make sure
// that stdin is a pipe so we can write the data.
if (!input.empty()) {
// findByChildFd() will throw std::invalid_argument if no pipe for
// STDIN_FILENO exists
findByChildFd(STDIN_FILENO);
}
std::pair<IOBufQueue, IOBufQueue> out;
auto readCallback = [&](int pfd, int cfd) -> bool {
if (cfd == STDOUT_FILENO) {
return handleRead(pfd, out.first);
} else if (cfd == STDERR_FILENO) {
return handleRead(pfd, out.second);
} else {
// Don't close the file descriptor, the child might not like SIGPIPE,
// just read and throw the data away.
return discardRead(pfd);
}
};
auto writeCallback = [&](int pfd, int cfd) -> bool {
if (cfd == STDIN_FILENO) {
return handleWrite(pfd, input);
} else {
// If we don't want to write to this fd, just close it.
return true;
}
};
communicate(std::move(readCallback), std::move(writeCallback));
return out;
}
void Subprocess::communicate(
FdCallback readCallback, FdCallback writeCallback) {
// This serves to prevent wait() followed by communicate(), but if you
// legitimately need that, send a patch to delete this line.
returnCode_.enforce(ProcessReturnCode::RUNNING);
setAllNonBlocking();
std::vector<pollfd> fds;
fds.reserve(pipes_.size());
std::vector<size_t> toClose; // indexes into pipes_
toClose.reserve(pipes_.size());
while (!pipes_.empty()) {
fds.clear();
toClose.clear();
for (auto& p : pipes_) {
pollfd pfd;
pfd.fd = p.pipe.fd();
// Yes, backwards, PIPE_IN / PIPE_OUT are defined from the
// child's point of view.
if (!p.enabled) {
// Still keeping fd in watched set so we get notified of POLLHUP /
// POLLERR
pfd.events = 0;
} else if (p.direction == PIPE_IN) {
pfd.events = POLLOUT;
} else {
pfd.events = POLLIN;
}
fds.push_back(pfd);
}
int r;
do {
r = ::poll(fds.data(), fds.size(), -1);
} while (r == -1 && errno == EINTR);
checkUnixError(r, "poll");
for (size_t i = 0; i < pipes_.size(); ++i) {
auto& p = pipes_[i];
auto parentFd = p.pipe.fd();
DCHECK_EQ(fds[i].fd, parentFd);
short events = fds[i].revents;
bool closed = false;
if (events & POLLOUT) {
DCHECK(!(events & POLLIN));
if (writeCallback(parentFd, p.childFd)) {
toClose.push_back(i);
closed = true;
}
}
// Call read callback on POLLHUP, to give it a chance to read (and act
// on) end of file
if (events & (POLLIN | POLLHUP)) {
DCHECK(!(events & POLLOUT));
if (readCallback(parentFd, p.childFd)) {
toClose.push_back(i);
closed = true;
}
}
if ((events & (POLLHUP | POLLERR)) && !closed) {
toClose.push_back(i);
}
}
// Close the fds in reverse order so the indexes hold after erase()
for (int idx : boost::adaptors::reverse(toClose)) {
auto pos = pipes_.begin() + idx;
pos->pipe.close(); // Throws on error
pipes_.erase(pos);
}
}
}
void Subprocess::enableNotifications(int childFd, bool enabled) {
pipes_[findByChildFd(childFd)].enabled = enabled;
}
bool Subprocess::notificationsEnabled(int childFd) const {
return pipes_[findByChildFd(childFd)].enabled;
}
size_t Subprocess::findByChildFd(int childFd) const {
auto pos = std::lower_bound(
pipes_.begin(), pipes_.end(), childFd, [](const Pipe& pipe, int fd) {
return pipe.childFd < fd;
});
if (pos == pipes_.end() || pos->childFd != childFd) {
throw std::invalid_argument(
folly::to<std::string>("child fd not found ", childFd));
}
return pos - pipes_.begin();
}
void Subprocess::closeParentFd(int childFd) {
int idx = findByChildFd(childFd);
pipes_[idx].pipe.close(); // May throw
pipes_.erase(pipes_.begin() + idx);
}
std::vector<Subprocess::ChildPipe> Subprocess::takeOwnershipOfPipes() {
std::vector<Subprocess::ChildPipe> pipes;
for (auto& p : pipes_) {
pipes.emplace_back(p.childFd, std::move(p.pipe));
}
// release memory
std::vector<Pipe>().swap(pipes_);
return pipes;
}
namespace {
class Initializer {
public:
Initializer() {
// We like EPIPE, thanks.
::signal(SIGPIPE, SIG_IGN);
}
};
Initializer initializer;
} // namespace
} // namespace folly