Status Subprocess::Start()

in src/kudu/util/subprocess.cc [321:558]


Status Subprocess::Start() {
  VLOG(2) << "Invoking command: " << argv_;
  if (state_ != kNotStarted) {
    const string err_str = Substitute("$0: illegal sub-process state", state_.load());
    LOG(DFATAL) << err_str;
    return Status::IllegalState(err_str);
  }
  if (argv_.empty()) {
    return Status::InvalidArgument("argv must have at least one elem");
  }

  // We explicitly set SIGPIPE to SIG_IGN here because we are using UNIX pipes.
  IgnoreSigPipe();

  vector<char*> argv_ptrs;
  for (const string& arg : argv_) {
    argv_ptrs.push_back(const_cast<char*>(arg.c_str()));
  }
  argv_ptrs.push_back(nullptr);

  // Pipe from caller process to child's stdin
  // [0] = stdin for child, [1] = how parent writes to it
  int child_stdin[2] = {-1, -1};
  if (fd_state_[STDIN_FILENO] == PIPED) {
    PCHECK(pipe2(child_stdin, O_CLOEXEC) == 0);
  }
  // Pipe from child's stdout back to caller process
  // [0] = how parent reads from child's stdout, [1] = how child writes to it
  int child_stdout[2] = {-1, -1};
  if (fd_state_[STDOUT_FILENO] == PIPED) {
    PCHECK(pipe2(child_stdout, O_CLOEXEC) == 0);
  }
  // Pipe from child's stderr back to caller process
  // [0] = how parent reads from child's stderr, [1] = how child writes to it
  int child_stderr[2] = {-1, -1};
  if (fd_state_[STDERR_FILENO] == PIPED) {
    PCHECK(pipe2(child_stderr, O_CLOEXEC) == 0);
  }
  // The synchronization pipe: this trick is to make sure the parent returns
  // control only after the child process has invoked execvp().
  int sync_pipe[2];
  PCHECK(pipe2(sync_pipe, O_CLOEXEC) == 0);

  DIR* fd_dir = nullptr;
  RETURN_NOT_OK_PREPEND(OpenProcFdDir(&fd_dir), "Unable to open fd dir");
  unique_ptr<DIR, std::function<void(DIR*)>> fd_dir_closer(fd_dir,
                                                           CloseProcFdDir);
  int ret;
  RETRY_ON_EINTR(ret, fork());
  if (ret == -1) {
    return Status::RuntimeError("Unable to fork", ErrnoToString(errno), errno);
  }
  if (ret == 0) { // We are the child
    // As a general note, it's not safe to call non-async-signal-safe functions
    // in the child process between fork() and exec(). Surprisingly, a call to
    // LOG() locks a mutex that may have been copied from the parent's address
    // space in an already locked state, so it is not async-signal-safe and
    // can deadlock the child if called. So, in this vulnerable state the child
    // outputs log messages using RAW_LOG() instead directly into stderr.
    // RAW_LOG() uses vsnprintf() under the hood: it's not async-signal-safe
    // strictly speaking (might call malloc() and getenv() in some cases which
    // might acquire locks themselves), but it's much better than using LOG()
    // where it can simply deadlock on glog's mutex. BTW, some allocators like
    // tcmalloc install pthread_atfork() handlers, so with tcmalloc we have
    // more safety with vsnprintf().
    //
    // An alternative approach might be to use some additional functionality
    // in glog library (once implemented) to establish thread_atfork() handlers;
    // see https://github.com/robi56/google-glog/issues/101 for details.

    // Send the child a SIGTERM when the parent dies. This is done as early
    // as possible in the child's life to prevent any orphaning whatsoever
    // (e.g. from KUDU-402).
#if defined(__linux__)
    // TODO: prctl(PR_SET_PDEATHSIG) is Linux-specific, look into portable ways
    // to prevent orphans when parent is killed.
    prctl(PR_SET_PDEATHSIG, SIGKILL);
#endif

    // stdin
    if (fd_state_[STDIN_FILENO] == PIPED) {
      int dup2_ret;
      RETRY_ON_EINTR(dup2_ret, dup2(child_stdin[0], STDIN_FILENO));
      if (dup2_ret != STDIN_FILENO) {
        int err = errno;
        RAW_LOG(FATAL, "dup2() failed (STDIN): [%d]", err);
      }
    } else {
      RAW_DCHECK(SHARED == fd_state_[STDIN_FILENO],
                 "unexpected state of STDIN");
    }

    // stdout
    switch (fd_state_[STDOUT_FILENO]) {
      case PIPED: {
        int dup2_ret;
        RETRY_ON_EINTR(dup2_ret, dup2(child_stdout[1], STDOUT_FILENO));
        if (dup2_ret != STDOUT_FILENO) {
          int err = errno;
          RAW_LOG(FATAL, "dup2() failed (STDOUT): [%d]", err);
        }
        break;
      }
      case DISABLED: {
        RedirectToDevNull(STDOUT_FILENO);
        break;
      }
      default:
        RAW_DCHECK(SHARED == fd_state_[STDOUT_FILENO],
                   "unexpected state of STDOUT");
        break;
    }

    // stderr
    switch (fd_state_[STDERR_FILENO]) {
      case PIPED: {
        int dup2_ret;
        RETRY_ON_EINTR(dup2_ret, dup2(child_stderr[1], STDERR_FILENO));
        if (dup2_ret != STDERR_FILENO) {
          int err = errno;
          RAW_LOG(FATAL, "dup2() failed (STDERR): [%d]", err);
        }
        break;
      }
      case DISABLED: {
        RedirectToDevNull(STDERR_FILENO);
        break;
      }
      default:
        RAW_DCHECK(SHARED == fd_state_[STDERR_FILENO],
                   "unexpected state of STDERR");
        break;
    }

    // Close the read side of the sync pipe;
    // the write side should be closed upon execvp().
    if (PREDICT_FALSE(close(sync_pipe[0]) != 0)) {
      int err = errno;
      RAW_LOG(FATAL, "close() on the read side of sync pipe failed: [%d]", err);
    }

    CloseNonStandardFDs(fd_dir);

    // Ensure we are not ignoring or blocking signals in the child process.
    ResetAllSignalMasksToUnblocked();

    // Reset the disposition of SIGPIPE to SIG_DFL because we routinely set its
    // disposition to SIG_IGN via IgnoreSigPipe(). At the time of writing, we
    // don't explicitly ignore any other signals in Kudu.
    ResetSigPipeHandlerToDefault();

    // Set the current working directory of the subprocess.
    if (!cwd_.empty() && chdir(cwd_.c_str()) == -1) {
      int err = errno;
      RAW_LOG(FATAL, "chdir() to '%s' failed: [%d]", cwd_.c_str(), err);
    }

    // Set the environment for the subprocess. This is more portable than
    // using execvpe(), which doesn't exist on OS X. We rely on the 'p'
    // variant of exec to do $PATH searching if the executable specified
    // by the caller isn't an absolute path.
    for (const auto& env : env_) {
      PCHECK(setenv(env.first.c_str(), env.second.c_str(), 1 /* overwrite */) == 0);
    }

    execvp(program_.c_str(), &argv_ptrs[0]);
    int err = errno;
    RAW_LOG(ERROR, "could not exec '%s': [%d]", program_.c_str(), err);
    _exit(err);
  } else {
    // We are the parent
    child_pid_ = ret;
    // Close child's side of the pipes
    if (fd_state_[STDIN_FILENO]  == PIPED) {
      if (PREDICT_FALSE(close(child_stdin[0]) != 0)) {
        const int err = errno;
        RAW_LOG(WARNING, "could not close child's STDIN: [%d]", err);
      }
    }
    if (fd_state_[STDOUT_FILENO] == PIPED) {
      if (PREDICT_FALSE(close(child_stdout[1]) != 0)) {
        const int err = errno;
        RAW_LOG(WARNING, "could not close child's STDOUT: [%d]", err);
      }
    }
    if (fd_state_[STDERR_FILENO] == PIPED) {
      if (PREDICT_FALSE(close(child_stderr[1]) != 0)) {
        const int err = errno;
        RAW_LOG(WARNING, "could not close child's STDERR: [%d]", err);
      }
    }
    // Keep parent's side of the pipes
    child_fds_[STDIN_FILENO]  = child_stdin[1];
    child_fds_[STDOUT_FILENO] = child_stdout[0];
    child_fds_[STDERR_FILENO] = child_stderr[0];

    // Wait for the child process to invoke execvp(). The trick involves
    // a pipe with O_CLOEXEC option for its descriptors. The parent process
    // performs blocking read from the pipe while the write side of the pipe
    // is kept open by the child (it does not write any data, though). The write
    // side of the pipe is closed when the child invokes execvp(). At that
    // point, the parent should receive EOF, i.e. read() should return 0.
    {
      // Close the write side of the sync pipe. It's crucial to make sure
      // it succeeds otherwise the blocking read() below might wait forever
      // even if the child process has closed the pipe.
      const int close_ret = close(sync_pipe[1]);
      PCHECK(close_ret == 0);
      while (true) {
        uint8_t buf;
        int read_errno = 0;
        int rc;
        RETRY_ON_EINTR(rc, read(sync_pipe[0], &buf, 1));
        if (rc == -1) {
          read_errno = errno;
        }
        if (PREDICT_FALSE(close(sync_pipe[0]) != 0)) {
          const int err = errno;
          RAW_LOG(FATAL, "could not close the read side of the sync pipe: [%d]", err);
        }
        if (rc == 0) {
          // That's OK -- expecting EOF from the other side of the pipe.
          break;
        }
        if (rc == -1) {
          // Other errors besides EINTR are not expected.
          return Status::RuntimeError("Unexpected error from the sync pipe",
                                      ErrnoToString(read_errno), read_errno);
        }
        // No data is expected from the sync pipe.
        RAW_LOG(FATAL, "%d: unexpected data from the sync pipe", rc);
      }
    }
  }

  state_ = kRunning;
  return Status::OK();
}