common/protobuf/kudu/util/subprocess.cc (637 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #include "kudu/util/subprocess.h" #include <dirent.h> #include <fcntl.h> #include <signal.h> #if defined(__linux__) #include <sys/prctl.h> #endif #include <sys/wait.h> #include <unistd.h> #include <cerrno> #include <cstdint> #include <cstdlib> #include <cstring> #include <functional> #include <memory> #include <ostream> #include <string> #include <utility> #include <vector> #include <ev++.h> #include <glog/logging.h> #include <glog/raw_logging.h> #include <glog/stl_logging.h> #include "kudu/gutil/basictypes.h" #include "kudu/gutil/port.h" #include "kudu/gutil/strings/join.h" #include "kudu/gutil/strings/numbers.h" #include "kudu/gutil/strings/split.h" #include "kudu/gutil/strings/substitute.h" #include "kudu/util/env.h" #include "kudu/util/errno.h" #include "kudu/util/faststring.h" #include "kudu/util/monotime.h" #include "kudu/util/signal.h" #include "kudu/util/status.h" #include "kudu/util/stopwatch.h" using std::map; using std::string; using std::unique_ptr; using std::vector; using strings::Split; using strings::Substitute; using strings::SubstituteAndAppend; namespace kudu { // Make glog's STL-compatible operators visible inside this namespace. using ::operator<<; namespace { static double kProcessWaitTimeoutSeconds = 5.0; static const char* kProcSelfFd = #if defined(__APPLE__) "/dev/fd"; #else "/proc/self/fd"; #endif // defined(__APPLE__) #if defined(__linux__) #define READDIR readdir64 #define DIRENT dirent64 #else #define READDIR readdir #define DIRENT dirent #endif // Since opendir() calls malloc(), this must be called before fork(). // This function is not async-signal-safe. Status OpenProcFdDir(DIR** dir) { *dir = opendir(kProcSelfFd); if (PREDICT_FALSE(dir == nullptr)) { return Status::IOError(Substitute("opendir(\"$0\") failed", kProcSelfFd), ErrnoToString(errno), errno); } return Status::OK(); } // Close the directory stream opened by OpenProcFdDir(). // This function is not async-signal-safe. void CloseProcFdDir(DIR* dir) { if (PREDICT_FALSE(closedir(dir) == -1)) { LOG(WARNING) << "Unable to close fd dir: " << Status::IOError(Substitute("closedir(\"$0\") failed", kProcSelfFd), ErrnoToString(errno), errno).ToString(); } } // Close all open file descriptors other than stdin, stderr, stdout. // Expects a directory stream created by OpenProdFdDir() as a parameter. // This function is called after fork() and must not call malloc(). // The rule of thumb is to only call async-signal-safe functions in such cases // if at all possible. void CloseNonStandardFDs(DIR* fd_dir) { // This is implemented by iterating over the open file descriptors // rather than using sysconf(SC_OPEN_MAX) -- the latter is error prone // since it may not represent the highest open fd if the fd soft limit // has changed since the process started. This should also be faster // since iterating over all possible fds is likely to cause 64k+ syscalls // in typical configurations. // // Note also that this doesn't use any of the Env utility functions, to // make it as lean and mean as possible -- this runs in the subprocess // after a fork, so there's some possibility that various global locks // inside malloc() might be held, so allocating memory is a no-no. RAW_CHECK(fd_dir != nullptr, "fd_dir is null"); int dir_fd = dirfd(fd_dir); struct DIRENT* ent; // readdir64() is not reentrant (it uses a static buffer) and it also // locks fd_dir->lock, so it must not be called in a multi-threaded // environment and is certainly not async-signal-safe. // However, it appears to be safe to call right after fork(), since only one // thread exists in the child process at that time. It also does not call // malloc() or free(). We could use readdir64_r() instead, but all that // buys us is reentrancy, and not async-signal-safety, due to the use of // dir->lock, so seems not worth the added complexity in lifecycle & plumbing. while ((ent = READDIR(fd_dir)) != nullptr) { uint32_t fd; if (!safe_strtou32(ent->d_name, &fd)) continue; if (!(fd == STDIN_FILENO || fd == STDOUT_FILENO || fd == STDERR_FILENO || fd == dir_fd)) { int ret; RETRY_ON_EINTR(ret, close(fd)); } } } void RedirectToDevNull(int fd) { // We must not close stderr or stdout, because then when a new file // descriptor is opened, it might reuse the closed file descriptor's number // (we always allocate the lowest available file descriptor number). // // Instead, we open /dev/null as a new file descriptor, then use dup2() to // atomically close 'fd' and reuse its file descriptor number as an open file // handle to /dev/null. // // It is expected that the file descriptor allocated when opening /dev/null // will be closed when the child process closes all of its "non-standard" // file descriptors later on. int dev_null; RETRY_ON_EINTR(dev_null, open("/dev/null", O_WRONLY)); if (dev_null == -1) { int err = errno; RAW_LOG(WARNING, "failed to open /dev/null: [%d]", err); } else { int ret; RETRY_ON_EINTR(ret, dup2(dev_null, fd)); if (ret == -1) { int err = errno; RAW_LOG(FATAL, "dup2() on /dev/null failed: [%d]", err); } } } // Stateful libev watcher to help ReadFdsFully(). class ReadFdsFullyHelper { public: ReadFdsFullyHelper(string progname, ev::dynamic_loop* loop, int fd) : progname_(std::move(progname)) { // Bind the watcher to the provided loop, to this functor, and to the // readable fd. watcher_.set(*loop); watcher_.set(this); watcher_.set(fd, ev::READ); // The watcher will now be polled when its loop is run. watcher_.start(); } void operator() (ev::io &w, int revents) { DCHECK_EQ(ev::READ, revents); char buf[1024]; ssize_t n; RETRY_ON_EINTR(n, read(w.fd, buf, arraysize(buf))); if (n == 0) { // EOF, stop watching. w.stop(); } else if (n < 0) { // A fatal error. Store it and stop watching. status_ = Status::IOError("IO error reading from " + progname_, ErrnoToString(errno), errno); w.stop(); } else { // Add our bytes and keep watching. output_.append(buf, n); } } const Status& status() const { return status_; } const string& output() const { return output_; } private: const string progname_; ev::io watcher_; string output_; Status status_; }; // Reads from all descriptors in 'fds' until EOF on all of them. If any read // yields an error, it is returned. Otherwise, 'out' contains the bytes read // for each fd, in the same order as was in 'fds'. Status ReadFdsFully(const string& progname, const vector<int>& fds, vector<string>* out) { ev::dynamic_loop loop; // Set up a watcher for each fd. vector<unique_ptr<ReadFdsFullyHelper>> helpers; for (int fd : fds) { helpers.emplace_back(new ReadFdsFullyHelper(progname, &loop, fd)); } // This will read until all fds return EOF. loop.run(); // Check for failures. for (const auto& h : helpers) { if (!h->status().ok()) { return h->status(); } } // No failures; write the output to the caller. for (const auto& h : helpers) { out->push_back(h->output()); } return Status::OK(); } } // anonymous namespace Subprocess::Subprocess(vector<string> argv, int sig_on_destruct) : program_(argv[0]), argv_(std::move(argv)), state_(kNotStarted), child_pid_(-1), fd_state_(), child_fds_(), sig_on_destruct_(sig_on_destruct) { fd_state_[STDIN_FILENO] = PIPED; fd_state_[STDOUT_FILENO] = SHARED; fd_state_[STDERR_FILENO] = SHARED; child_fds_[STDIN_FILENO] = -1; child_fds_[STDOUT_FILENO] = -1; child_fds_[STDERR_FILENO] = -1; } Subprocess::~Subprocess() { if (state_ == kRunning) { LOG(WARNING) << Substitute( "Child process $0 ($1) was orphaned. Sending signal $2...", child_pid_, JoinStrings(argv_, " "), sig_on_destruct_); WARN_NOT_OK(KillAndWait(sig_on_destruct_), Substitute("Failed to KillAndWait() with signal $0", sig_on_destruct_)); } for (int i = 0; i < 3; ++i) { if (fd_state_[i] == PIPED && child_fds_[i] >= 0) { int ret; RETRY_ON_EINTR(ret, close(child_fds_[i])); } } } #if defined(__APPLE__) static int pipe2(int pipefd[2], int flags) { DCHECK_EQ(O_CLOEXEC, flags); int new_fds[2]; if (pipe(new_fds) == -1) { return -1; } if (fcntl(new_fds[0], F_SETFD, O_CLOEXEC) == -1) { int ret; RETRY_ON_EINTR(ret, close(new_fds[0])); RETRY_ON_EINTR(ret, close(new_fds[1])); return -1; } if (fcntl(new_fds[1], F_SETFD, O_CLOEXEC) == -1) { int ret; RETRY_ON_EINTR(ret, close(new_fds[0])); RETRY_ON_EINTR(ret, close(new_fds[1])); return -1; } pipefd[0] = new_fds[0]; pipefd[1] = new_fds[1]; return 0; } #endif Status Subprocess::Start() { VLOG(2) << "Invoking command: " << argv_; if (state_ != kNotStarted) { const string err_str = Substitute("$0: illegal sub-process state", state_); LOG(DFATAL) << err_str; return Status::IllegalState(err_str); } if (argv_.empty()) { return Status::InvalidArgument("argv must have at least one elem"); } // We explicitly set SIGPIPE to SIG_IGN here because we are using UNIX pipes. IgnoreSigPipe(); vector<char*> argv_ptrs; for (const string& arg : argv_) { argv_ptrs.push_back(const_cast<char*>(arg.c_str())); } argv_ptrs.push_back(nullptr); // Pipe from caller process to child's stdin // [0] = stdin for child, [1] = how parent writes to it int child_stdin[2] = {-1, -1}; if (fd_state_[STDIN_FILENO] == PIPED) { PCHECK(pipe2(child_stdin, O_CLOEXEC) == 0); } // Pipe from child's stdout back to caller process // [0] = how parent reads from child's stdout, [1] = how child writes to it int child_stdout[2] = {-1, -1}; if (fd_state_[STDOUT_FILENO] == PIPED) { PCHECK(pipe2(child_stdout, O_CLOEXEC) == 0); } // Pipe from child's stderr back to caller process // [0] = how parent reads from child's stderr, [1] = how child writes to it int child_stderr[2] = {-1, -1}; if (fd_state_[STDERR_FILENO] == PIPED) { PCHECK(pipe2(child_stderr, O_CLOEXEC) == 0); } // The synchronization pipe: this trick is to make sure the parent returns // control only after the child process has invoked execvp(). int sync_pipe[2]; PCHECK(pipe2(sync_pipe, O_CLOEXEC) == 0); DIR* fd_dir = nullptr; RETURN_NOT_OK_PREPEND(OpenProcFdDir(&fd_dir), "Unable to open fd dir"); unique_ptr<DIR, std::function<void(DIR*)>> fd_dir_closer(fd_dir, CloseProcFdDir); int ret; RETRY_ON_EINTR(ret, fork()); if (ret == -1) { return Status::RuntimeError("Unable to fork", ErrnoToString(errno), errno); } if (ret == 0) { // We are the child // As a general note, it's not safe to call non-async-signal-safe functions // in the child process between fork() and exec(). Surprisingly, a call to // LOG() locks a mutex that may have been copied from the parent's address // space in an already locked state, so it is not async-signal-safe and // can deadlock the child if called. So, in this vulnerable state the child // outputs log messages using RAW_LOG() instead directly into stderr. // RAW_LOG() uses vsnprintf() under the hood: it's not async-signal-safe // strictly speaking (might call malloc() and getenv() in some cases which // might acquire locks themselves), but it's much better than using LOG() // where it can simply deadlock on glog's mutex. BTW, some allocators like // tcmalloc install pthread_atfork() handlers, so with tcmalloc we have // more safety with vsnprintf(). // // An alternative approach might be to use some additional functionality // in glog library (once implemented) to establish thread_atfork() handlers; // see https://github.com/robi56/google-glog/issues/101 for details. // Send the child a SIGTERM when the parent dies. This is done as early // as possible in the child's life to prevent any orphaning whatsoever // (e.g. from KUDU-402). #if defined(__linux__) // TODO: prctl(PR_SET_PDEATHSIG) is Linux-specific, look into portable ways // to prevent orphans when parent is killed. prctl(PR_SET_PDEATHSIG, SIGKILL); #endif // stdin if (fd_state_[STDIN_FILENO] == PIPED) { int dup2_ret; RETRY_ON_EINTR(dup2_ret, dup2(child_stdin[0], STDIN_FILENO)); if (dup2_ret != STDIN_FILENO) { int err = errno; RAW_LOG(FATAL, "dup2() failed (STDIN): [%d]", err); } } else { RAW_DCHECK(SHARED == fd_state_[STDIN_FILENO], "unexpected state of STDIN"); } // stdout switch (fd_state_[STDOUT_FILENO]) { case PIPED: { int dup2_ret; RETRY_ON_EINTR(dup2_ret, dup2(child_stdout[1], STDOUT_FILENO)); if (dup2_ret != STDOUT_FILENO) { int err = errno; RAW_LOG(FATAL, "dup2() failed (STDOUT): [%d]", err); } break; } case DISABLED: { RedirectToDevNull(STDOUT_FILENO); break; } default: RAW_DCHECK(SHARED == fd_state_[STDOUT_FILENO], "unexpected state of STDOUT"); break; } // stderr switch (fd_state_[STDERR_FILENO]) { case PIPED: { int dup2_ret; RETRY_ON_EINTR(dup2_ret, dup2(child_stderr[1], STDERR_FILENO)); if (dup2_ret != STDERR_FILENO) { int err = errno; RAW_LOG(FATAL, "dup2() failed (STDERR): [%d]", err); } break; } case DISABLED: { RedirectToDevNull(STDERR_FILENO); break; } default: RAW_DCHECK(SHARED == fd_state_[STDERR_FILENO], "unexpected state of STDERR"); break; } // Close the read side of the sync pipe; // the write side should be closed upon execvp(). int close_ret; RETRY_ON_EINTR(close_ret, close(sync_pipe[0])); if (close_ret == -1) { int err = errno; RAW_LOG(FATAL, "close() on the read side of sync pipe failed: [%d]", err); } CloseNonStandardFDs(fd_dir); // Ensure we are not ignoring or blocking signals in the child process. ResetAllSignalMasksToUnblocked(); // Reset the disposition of SIGPIPE to SIG_DFL because we routinely set its // disposition to SIG_IGN via IgnoreSigPipe(). At the time of writing, we // don't explicitly ignore any other signals in Kudu. ResetSigPipeHandlerToDefault(); // Set the current working directory of the subprocess. if (!cwd_.empty() && chdir(cwd_.c_str()) == -1) { int err = errno; RAW_LOG(FATAL, "chdir() to '%s' failed: [%d]", cwd_.c_str(), err); } // Set the environment for the subprocess. This is more portable than // using execvpe(), which doesn't exist on OS X. We rely on the 'p' // variant of exec to do $PATH searching if the executable specified // by the caller isn't an absolute path. for (const auto& env : env_) { ignore_result(setenv(env.first.c_str(), env.second.c_str(), 1 /* overwrite */)); } execvp(program_.c_str(), &argv_ptrs[0]); int err = errno; RAW_LOG(ERROR, "could not exec '%s': [%d]", program_.c_str(), err); _exit(err); } else { // We are the parent child_pid_ = ret; // Close child's side of the pipes int close_ret; if (fd_state_[STDIN_FILENO] == PIPED) RETRY_ON_EINTR(close_ret, close(child_stdin[0])); if (fd_state_[STDOUT_FILENO] == PIPED) RETRY_ON_EINTR(close_ret, close(child_stdout[1])); if (fd_state_[STDERR_FILENO] == PIPED) RETRY_ON_EINTR(close_ret, close(child_stderr[1])); // Keep parent's side of the pipes child_fds_[STDIN_FILENO] = child_stdin[1]; child_fds_[STDOUT_FILENO] = child_stdout[0]; child_fds_[STDERR_FILENO] = child_stderr[0]; // Wait for the child process to invoke execvp(). The trick involves // a pipe with O_CLOEXEC option for its descriptors. The parent process // performs blocking read from the pipe while the write side of the pipe // is kept open by the child (it does not write any data, though). The write // side of the pipe is closed when the child invokes execvp(). At that // point, the parent should receive EOF, i.e. read() should return 0. { // Close the write side of the sync pipe. It's crucial to make sure // it succeeds otherwise the blocking read() below might wait forever // even if the child process has closed the pipe. RETRY_ON_EINTR(close_ret, close(sync_pipe[1])); PCHECK(close_ret == 0); while (true) { uint8_t buf; int err = 0; int rc; RETRY_ON_EINTR(rc, read(sync_pipe[0], &buf, 1)); if (rc == -1) { err = errno; } RETRY_ON_EINTR(close_ret, close(sync_pipe[0])); PCHECK(close_ret == 0); if (rc == 0) { // That's OK -- expecting EOF from the other side of the pipe. break; } else if (rc == -1) { // Other errors besides EINTR are not expected. return Status::RuntimeError("Unexpected error from the sync pipe", ErrnoToString(err), err); } // No data is expected from the sync pipe. LOG(FATAL) << Substitute("$0: unexpected data from the sync pipe", rc); } } } state_ = kRunning; return Status::OK(); } Status Subprocess::Wait(int* wait_status) { return DoWait(wait_status, BLOCKING); } Status Subprocess::WaitNoBlock(int* wait_status) { return DoWait(wait_status, NON_BLOCKING); } Status Subprocess::WaitAndCheckExitCode() { int wait_status; RETURN_NOT_OK(DoWait(&wait_status, BLOCKING)); int exit_status; string info_str; RETURN_NOT_OK(GetExitStatus(&exit_status, &info_str)); return exit_status == 0 ? Status::OK() : Status::RuntimeError(Substitute("Exit code: $0 ($1)", exit_status, info_str)); } Status Subprocess::GetProcfsState(int pid, ProcfsState* state) { faststring data; string filename = Substitute("/proc/$0/stat", pid); RETURN_NOT_OK(ReadFileToString(Env::Default(), filename, &data)); // The part of /proc/<pid>/stat that's relevant for us looks like this: // // "16009 (subprocess-test) R ..." // // The first number is the PID, the string in the parens in the command, and // the single letter afterwards is the process' state. // // To extract the state, we scan backwards looking for the last ')', then // increment past it and the separating space. This is safer than scanning // forward as it properly handles commands containing parens. string data_str = data.ToString(); const char* end_parens = strrchr(data_str.c_str(), ')'); if (end_parens == nullptr) { return Status::RuntimeError(Substitute("unexpected layout in $0", filename)); } char proc_state = end_parens[2]; switch (proc_state) { case 'T': *state = ProcfsState::PAUSED; break; default: *state = ProcfsState::RUNNING; break; } return Status::OK(); } Status Subprocess::Kill(int signal) { if (state_ != kRunning) { const string err_str = "Sub-process is not running"; LOG(DFATAL) << err_str; return Status::IllegalState(err_str); } if (kill(child_pid_, signal) != 0) { return Status::RuntimeError("Unable to kill", ErrnoToString(errno), errno); } // Signal delivery is often asynchronous. For some signals, we try to wait // for the process to actually change state, using /proc/<pid>/stat as a // guide. This is best-effort. ProcfsState desired_state; switch (signal) { case SIGSTOP: desired_state = ProcfsState::PAUSED; break; case SIGCONT: desired_state = ProcfsState::RUNNING; break; default: return Status::OK(); } Stopwatch sw; sw.start(); do { ProcfsState current_state; if (!GetProcfsState(child_pid_, &current_state).ok()) { // There was some error parsing /proc/<pid>/stat (or perhaps it doesn't // exist on this platform). return Status::OK(); } if (current_state == desired_state) { return Status::OK(); } SleepFor(MonoDelta::FromMilliseconds(10)); } while (sw.elapsed().wall_seconds() < kProcessWaitTimeoutSeconds); return Status::OK(); } Status Subprocess::KillAndWait(int signal) { if (state_ != kRunning) { const string err_str = "Sub-process is not running"; LOG(DFATAL) << err_str; return Status::IllegalState(err_str); } string procname = Substitute("$0 (pid $1)", argv0(), pid()); // This is a fatal error because all errors in Kill() are signal-independent, // so Kill(SIGKILL) is just as likely to fail if this did. RETURN_NOT_OK_PREPEND( Kill(signal), Substitute("Failed to send signal $0 to $1", signal, procname)); if (signal == SIGKILL) { RETURN_NOT_OK_PREPEND( Wait(), Substitute("Failed to wait on $0", procname)); } else { Status s; Stopwatch sw; sw.start(); do { s = WaitNoBlock(); if (s.ok()) { break; } else if (!s.IsTimedOut()) { // An unexpected error in WaitNoBlock() is likely to manifest repeatedly, // so there's no point in retrying this. RETURN_NOT_OK_PREPEND( s, Substitute("Unexpected failure while waiting on $0", procname)); } SleepFor(MonoDelta::FromMilliseconds(10)); } while (sw.elapsed().wall_seconds() < kProcessWaitTimeoutSeconds); if (s.IsTimedOut()) { return KillAndWait(SIGKILL); } } return Status::OK(); } Status Subprocess::GetExitStatus(int* exit_status, string* info_str) const { if (state_ != kExited) { const string err_str = "Sub-process termination hasn't yet been detected"; LOG(DFATAL) << err_str; return Status::IllegalState(err_str); } string info; int status; if (WIFEXITED(wait_status_)) { status = WEXITSTATUS(wait_status_); if (status == 0) { info = Substitute("$0: process successfully exited", program_); } else { info = Substitute("$0: process exited with non-zero status $1", program_, status); } } else if (WIFSIGNALED(wait_status_)) { // Using signal number as exit status. status = WTERMSIG(wait_status_); info = Substitute("$0: process exited on signal $1", program_, status); #if defined(WCOREDUMP) if (WCOREDUMP(wait_status_)) { SubstituteAndAppend(&info, " (core dumped)"); } #endif } else { status = -1; info = Substitute("$0: process reported unexpected wait status $1", program_, wait_status_); LOG(DFATAL) << info; } if (exit_status) { *exit_status = status; } if (info_str) { *info_str = info; } return Status::OK(); } Status Subprocess::Call(const string& arg_str) { vector<string> argv = Split(arg_str, " "); return Call(argv, "", nullptr, nullptr); } Status Subprocess::Call(const vector<string>& argv, const string& stdin_in, string* stdout_out, string* stderr_out, map<string, string> env_vars) { Subprocess p(argv); if (stdout_out) { p.ShareParentStdout(false); } if (stderr_out) { p.ShareParentStderr(false); } if (!env_vars.empty()) { p.SetEnvVars(std::move(env_vars)); } RETURN_NOT_OK_PREPEND(p.Start(), "Unable to fork " + argv[0]); if (!stdin_in.empty()) { ssize_t written; RETRY_ON_EINTR(written, write(p.to_child_stdin_fd(), stdin_in.data(), stdin_in.size())); if (written < stdin_in.size()) { return Status::IOError("Unable to write to child process stdin", ErrnoToString(errno), errno); } } int err; RETRY_ON_EINTR(err, close(p.ReleaseChildStdinFd())); if (PREDICT_FALSE(err != 0)) { return Status::IOError("Unable to close child process stdin", ErrnoToString(errno), errno); } vector<int> fds; if (stdout_out) { fds.push_back(p.from_child_stdout_fd()); } if (stderr_out) { fds.push_back(p.from_child_stderr_fd()); } vector<string> outv; RETURN_NOT_OK(ReadFdsFully(argv[0], fds, &outv)); // Given that ReadFdsFully captures the strings in the order in which we // had installed 'fds' above, it can be assured that we can receive // as many strings as there were 'fds' in the vector and in that order. CHECK_EQ(outv.size(), fds.size()); if (stdout_out) { *stdout_out = std::move(outv.front()); } if (stderr_out) { *stderr_out = std::move(outv.back()); } RETURN_NOT_OK_PREPEND(p.Wait(), "Unable to wait() for " + argv[0]); int exit_status; string exit_info_str; RETURN_NOT_OK(p.GetExitStatus(&exit_status, &exit_info_str)); if (exit_status != 0) { return Status::RuntimeError(exit_info_str); } return Status::OK(); } pid_t Subprocess::pid() const { CHECK_EQ(state_, kRunning); return child_pid_; } Status Subprocess::DoWait(int* wait_status, WaitMode mode) { if (state_ == kExited) { if (wait_status) { *wait_status = wait_status_; } return Status::OK(); } if (state_ != kRunning) { const string err_str = Substitute("$0: illegal sub-process state", state_); LOG(DFATAL) << err_str; return Status::IllegalState(err_str); } const int options = (mode == NON_BLOCKING) ? WNOHANG : 0; int status; int rc; RETRY_ON_EINTR(rc, waitpid(child_pid_, &status, options)); if (rc == -1) { return Status::RuntimeError("Unable to wait on child", ErrnoToString(errno), errno); } if (mode == NON_BLOCKING && rc == 0) { return Status::TimedOut(""); } CHECK_EQ(rc, child_pid_); CHECK(WIFEXITED(status) || WIFSIGNALED(status)); child_pid_ = -1; wait_status_ = status; state_ = kExited; if (wait_status) { *wait_status = status; } return Status::OK(); } void Subprocess::SetEnvVars(map<string, string> env) { CHECK_EQ(state_, kNotStarted); env_ = std::move(env); } void Subprocess::SetCurrentDir(string cwd) { CHECK_EQ(state_, kNotStarted); cwd_ = std::move(cwd); } void Subprocess::SetFdShared(int stdfd, bool share) { CHECK_EQ(state_, kNotStarted); fd_state_[stdfd] = share ? SHARED : PIPED; } void Subprocess::DisableStderr() { CHECK_EQ(state_, kNotStarted); fd_state_[STDERR_FILENO] = DISABLED; } void Subprocess::DisableStdout() { CHECK_EQ(state_, kNotStarted); fd_state_[STDOUT_FILENO] = DISABLED; } int Subprocess::CheckAndOffer(int stdfd) const { CHECK_EQ(state_, kRunning); CHECK_EQ(fd_state_[stdfd], PIPED); return child_fds_[stdfd]; } int Subprocess::ReleaseChildFd(int stdfd) { CHECK_EQ(state_, kRunning); CHECK_GE(child_fds_[stdfd], 0); CHECK_EQ(fd_state_[stdfd], PIPED); int ret = child_fds_[stdfd]; child_fds_[stdfd] = -1; return ret; } bool Subprocess::IsStarted() { return state_ != kNotStarted; } } // namespace kudu