int Subprocess::prepareChild()

in folly/Subprocess.cpp [581:1106]


int Subprocess::prepareChild(
    const Options& options,
    const sigset_t* sigmask,
    const char* childDir) const {
  // While all signals are blocked, we must reset their
  // dispositions to default.
  for (int sig = 1; sig < NSIG; ++sig) {
    ::signal(sig, SIG_DFL);
  }

  {
    // Unblock signals; restore signal mask.
    int r = pthread_sigmask(SIG_SETMASK, sigmask, nullptr);
    if (r != 0) {
      return r; // pthread_sigmask() returns an errno value
    }
  }

  // Change the working directory, if one is given
  if (childDir) {
    if (::chdir(childDir) == -1) {
      return errno;
    }
  }

#ifdef __linux__
  // Best effort
  if (options.cpuSet_.hasValue()) {
    const auto& cpuSet = options.cpuSet_.value();
    ::sched_setaffinity(0, sizeof(cpuSet), &cpuSet);
  }
#endif

  // We don't have to explicitly close the parent's end of all pipes,
  // as they all have the FD_CLOEXEC flag set and will be closed at
  // exec time.

  // Redirect requested FDs to /dev/null or NUL
  // dup2 any explicitly specified FDs
  for (auto& p : options.fdActions_) {
    if (p.second == DEV_NULL) {
      // folly/portability/Fcntl provides an impl of open that will
      // map this to NUL on Windows.
      auto devNull = ::open("/dev/null", O_RDWR | O_CLOEXEC);
      if (devNull == -1) {
        return errno;
      }
      // note: dup2 will not set CLOEXEC on the destination
      if (::dup2(devNull, p.first) == -1) {
        // explicit close on error to avoid leaking fds
        ::close(devNull);
        return errno;
      }
      ::close(devNull);
    } else if (p.second != p.first) {
      if (::dup2(p.second, p.first) == -1) {
        return errno;
      }
    }
  }

  if (options.closeOtherFds_) {
    closeInheritedFds(options.fdActions_);
  }

#if defined(__linux__)
  // Opt to receive signal on parent death, if requested
  if (options.parentDeathSignal_ != 0) {
    const auto parentDeathSignal =
        static_cast<unsigned long>(options.parentDeathSignal_);
    if (prctl(PR_SET_PDEATHSIG, parentDeathSignal, 0, 0, 0) == -1) {
      return errno;
    }
  }
#endif

  if (options.processGroupLeader_) {
#if !defined(__FreeBSD__)
    if (setpgrp() == -1) {
#else
    if (setpgrp(getpid(), getpgrp()) == -1) {
#endif
      return errno;
    }
  }

  // The user callback comes last, so that the child is otherwise all set up.
  if (options.dangerousPostForkPreExecCallback_) {
    if (int error = (*options.dangerousPostForkPreExecCallback_)()) {
      return error;
    }
  }

  return 0;
}

int Subprocess::runChild(
    const char* executable,
    char** argv,
    char** env,
    const Options& options) const {
  // Now, finally, exec.
  if (options.usePath_) {
    ::execvp(executable, argv);
  } else {
    ::execve(executable, argv, env);
  }
  return errno;
}

void Subprocess::readChildErrorPipe(int pfd, const char* executable) {
  ChildErrorInfo info;
  auto rc = readNoInt(pfd, &info, sizeof(info));
  if (rc == 0) {
    // No data means the child executed successfully, and the pipe
    // was closed due to the close-on-exec flag being set.
    return;
  } else if (rc != sizeof(ChildErrorInfo)) {
    // An error occurred trying to read from the pipe, or we got a partial read.
    // Neither of these cases should really occur in practice.
    //
    // We can't get any error data from the child in this case, and we don't
    // know if it is successfully running or not.  All we can do is to return
    // normally, as if the child executed successfully.  If something bad
    // happened the caller should at least get a non-normal exit status from
    // the child.
    XLOGF(
        ERR,
        "unexpected error trying to read from child error pipe rc={}, errno={}",
        rc,
        errno);
    return;
  }

  // We got error data from the child.  The child should exit immediately in
  // this case, so wait on it to clean up.
  wait();

  // Throw to signal the error
  throw SubprocessSpawnError(executable, info.errCode, info.errnoValue);
}

ProcessReturnCode Subprocess::poll(struct rusage* ru) {
  returnCode_.enforce(ProcessReturnCode::RUNNING);
  DCHECK_GT(pid_, 0);
  int status;
  pid_t found = ::wait4(pid_, &status, WNOHANG, ru);
  // The spec guarantees that EINTR does not occur with WNOHANG, so the only
  // two remaining errors are ECHILD (other code reaped the child?), or
  // EINVAL (cosmic rays?), both of which merit an abort:
  PCHECK(found != -1) << "waitpid(" << pid_ << ", &status, WNOHANG)";
  if (found != 0) {
    // Though the child process had quit, this call does not close the pipes
    // since its descendants may still be using them.
    returnCode_ = ProcessReturnCode::make(status);
    pid_ = -1;
  }
  return returnCode_;
}

bool Subprocess::pollChecked() {
  if (poll().state() == ProcessReturnCode::RUNNING) {
    return false;
  }
  checkStatus(returnCode_);
  return true;
}

ProcessReturnCode Subprocess::wait() {
  returnCode_.enforce(ProcessReturnCode::RUNNING);
  DCHECK_GT(pid_, 0);
  int status;
  pid_t found;
  do {
    found = ::waitpid(pid_, &status, 0);
  } while (found == -1 && errno == EINTR);
  // The only two remaining errors are ECHILD (other code reaped the
  // child?), or EINVAL (cosmic rays?), and both merit an abort:
  PCHECK(found != -1) << "waitpid(" << pid_ << ", &status, 0)";
  // Though the child process had quit, this call does not close the pipes
  // since its descendants may still be using them.
  DCHECK_EQ(found, pid_);
  returnCode_ = ProcessReturnCode::make(status);
  pid_ = -1;
  return returnCode_;
}

void Subprocess::waitChecked() {
  wait();
  checkStatus(returnCode_);
}

ProcessReturnCode Subprocess::waitTimeout(TimeoutDuration timeout) {
  returnCode_.enforce(ProcessReturnCode::RUNNING);
  DCHECK_GT(pid_, 0) << "The subprocess has been waited already";

  auto pollUntil = std::chrono::steady_clock::now() + timeout;
  auto sleepDuration = std::chrono::milliseconds{2};
  constexpr auto maximumSleepDuration = std::chrono::milliseconds{100};

  for (;;) {
    // Always call waitpid once after the full timeout has elapsed.
    auto now = std::chrono::steady_clock::now();

    int status;
    pid_t found;
    do {
      found = ::waitpid(pid_, &status, WNOHANG);
    } while (found == -1 && errno == EINTR);
    PCHECK(found != -1) << "waitpid(" << pid_ << ", &status, WNOHANG)";
    if (found) {
      // Just on the safe side, make sure it's the actual pid we are waiting.
      DCHECK_EQ(found, pid_);
      returnCode_ = ProcessReturnCode::make(status);
      // Change pid_ to -1 to detect programming error like calling
      // this method multiple times.
      pid_ = -1;
      return returnCode_;
    }
    if (now > pollUntil) {
      // Timed out: still running().
      return returnCode_;
    }
    // The subprocess is still running, sleep for increasing periods of time.
    std::this_thread::sleep_for(sleepDuration);
    sleepDuration =
        std::min(maximumSleepDuration, sleepDuration + sleepDuration);
  }
}

void Subprocess::sendSignal(int signal) {
  returnCode_.enforce(ProcessReturnCode::RUNNING);
  int r = ::kill(pid_, signal);
  checkUnixError(r, "kill");
}

ProcessReturnCode Subprocess::waitOrTerminateOrKill(
    TimeoutDuration waitTimeout, TimeoutDuration sigtermTimeout) {
  returnCode_.enforce(ProcessReturnCode::RUNNING);
  DCHECK_GT(pid_, 0) << "The subprocess has been waited already";

  this->waitTimeout(waitTimeout);

  if (returnCode_.running()) {
    return terminateOrKill(sigtermTimeout);
  }
  return returnCode_;
}

ProcessReturnCode Subprocess::terminateOrKill(TimeoutDuration sigtermTimeout) {
  returnCode_.enforce(ProcessReturnCode::RUNNING);
  DCHECK_GT(pid_, 0) << "The subprocess has been waited already";
  // 1. Send SIGTERM to kill the process
  terminate();
  // 2. check whether subprocess has terminated using non-blocking waitpid
  waitTimeout(sigtermTimeout);
  if (!returnCode_.running()) {
    return returnCode_;
  }
  // 3. If we are at this point, we have waited enough time after
  // sending SIGTERM, we have to use nuclear option SIGKILL to kill
  // the subprocess.
  XLOGF(INFO, "Send SIGKILL to {}", pid_);
  kill();
  // 4. SIGKILL should kill the process otherwise there must be
  // something seriously wrong, just use blocking wait to wait for the
  // subprocess to finish.
  return wait();
}

pid_t Subprocess::pid() const {
  return pid_;
}

namespace {

ByteRange queueFront(const IOBufQueue& queue) {
  auto* p = queue.front();
  if (!p) {
    return ByteRange{};
  }
  return io::Cursor(p).peekBytes();
}

// fd write
bool handleWrite(int fd, IOBufQueue& queue) {
  for (;;) {
    auto b = queueFront(queue);
    if (b.empty()) {
      return true; // EOF
    }

    ssize_t n = writeNoInt(fd, b.data(), b.size());
    if (n == -1 && errno == EAGAIN) {
      return false;
    }
    checkUnixError(n, "write");
    queue.trimStart(n);
  }
}

// fd read
bool handleRead(int fd, IOBufQueue& queue) {
  for (;;) {
    auto p = queue.preallocate(100, 65000);
    ssize_t n = readNoInt(fd, p.first, p.second);
    if (n == -1 && errno == EAGAIN) {
      return false;
    }
    checkUnixError(n, "read");
    if (n == 0) {
      return true;
    }
    queue.postallocate(n);
  }
}

bool discardRead(int fd) {
  static const size_t bufSize = 65000;
  // Thread unsafe, but it doesn't matter.
  static std::unique_ptr<char[]> buf(new char[bufSize]);

  for (;;) {
    ssize_t n = readNoInt(fd, buf.get(), bufSize);
    if (n == -1 && errno == EAGAIN) {
      return false;
    }
    checkUnixError(n, "read");
    if (n == 0) {
      return true;
    }
  }
}

} // namespace

std::pair<std::string, std::string> Subprocess::communicate(StringPiece input) {
  IOBufQueue inputQueue;
  inputQueue.wrapBuffer(input.data(), input.size());

  auto outQueues = communicateIOBuf(std::move(inputQueue));
  auto outBufs =
      std::make_pair(outQueues.first.move(), outQueues.second.move());
  std::pair<std::string, std::string> out;
  if (outBufs.first) {
    outBufs.first->coalesce();
    out.first.assign(
        reinterpret_cast<const char*>(outBufs.first->data()),
        outBufs.first->length());
  }
  if (outBufs.second) {
    outBufs.second->coalesce();
    out.second.assign(
        reinterpret_cast<const char*>(outBufs.second->data()),
        outBufs.second->length());
  }
  return out;
}

std::pair<IOBufQueue, IOBufQueue> Subprocess::communicateIOBuf(
    IOBufQueue input) {
  // If the user supplied a non-empty input buffer, make sure
  // that stdin is a pipe so we can write the data.
  if (!input.empty()) {
    // findByChildFd() will throw std::invalid_argument if no pipe for
    // STDIN_FILENO exists
    findByChildFd(STDIN_FILENO);
  }

  std::pair<IOBufQueue, IOBufQueue> out;

  auto readCallback = [&](int pfd, int cfd) -> bool {
    if (cfd == STDOUT_FILENO) {
      return handleRead(pfd, out.first);
    } else if (cfd == STDERR_FILENO) {
      return handleRead(pfd, out.second);
    } else {
      // Don't close the file descriptor, the child might not like SIGPIPE,
      // just read and throw the data away.
      return discardRead(pfd);
    }
  };

  auto writeCallback = [&](int pfd, int cfd) -> bool {
    if (cfd == STDIN_FILENO) {
      return handleWrite(pfd, input);
    } else {
      // If we don't want to write to this fd, just close it.
      return true;
    }
  };

  communicate(std::move(readCallback), std::move(writeCallback));

  return out;
}

void Subprocess::communicate(
    FdCallback readCallback, FdCallback writeCallback) {
  // This serves to prevent wait() followed by communicate(), but if you
  // legitimately need that, send a patch to delete this line.
  returnCode_.enforce(ProcessReturnCode::RUNNING);
  setAllNonBlocking();

  std::vector<pollfd> fds;
  fds.reserve(pipes_.size());
  std::vector<size_t> toClose; // indexes into pipes_
  toClose.reserve(pipes_.size());

  while (!pipes_.empty()) {
    fds.clear();
    toClose.clear();

    for (auto& p : pipes_) {
      pollfd pfd;
      pfd.fd = p.pipe.fd();
      // Yes, backwards, PIPE_IN / PIPE_OUT are defined from the
      // child's point of view.
      if (!p.enabled) {
        // Still keeping fd in watched set so we get notified of POLLHUP /
        // POLLERR
        pfd.events = 0;
      } else if (p.direction == PIPE_IN) {
        pfd.events = POLLOUT;
      } else {
        pfd.events = POLLIN;
      }
      fds.push_back(pfd);
    }

    int r;
    do {
      r = ::poll(fds.data(), fds.size(), -1);
    } while (r == -1 && errno == EINTR);
    checkUnixError(r, "poll");

    for (size_t i = 0; i < pipes_.size(); ++i) {
      auto& p = pipes_[i];
      auto parentFd = p.pipe.fd();
      DCHECK_EQ(fds[i].fd, parentFd);
      short events = fds[i].revents;

      bool closed = false;
      if (events & POLLOUT) {
        DCHECK(!(events & POLLIN));
        if (writeCallback(parentFd, p.childFd)) {
          toClose.push_back(i);
          closed = true;
        }
      }

      // Call read callback on POLLHUP, to give it a chance to read (and act
      // on) end of file
      if (events & (POLLIN | POLLHUP)) {
        DCHECK(!(events & POLLOUT));
        if (readCallback(parentFd, p.childFd)) {
          toClose.push_back(i);
          closed = true;
        }
      }

      if ((events & (POLLHUP | POLLERR)) && !closed) {
        toClose.push_back(i);
      }
    }

    // Close the fds in reverse order so the indexes hold after erase()
    for (int idx : boost::adaptors::reverse(toClose)) {
      auto pos = pipes_.begin() + idx;
      pos->pipe.close(); // Throws on error
      pipes_.erase(pos);
    }
  }
}

void Subprocess::enableNotifications(int childFd, bool enabled) {
  pipes_[findByChildFd(childFd)].enabled = enabled;
}

bool Subprocess::notificationsEnabled(int childFd) const {
  return pipes_[findByChildFd(childFd)].enabled;
}

size_t Subprocess::findByChildFd(int childFd) const {
  auto pos = std::lower_bound(
      pipes_.begin(), pipes_.end(), childFd, [](const Pipe& pipe, int fd) {
        return pipe.childFd < fd;
      });
  if (pos == pipes_.end() || pos->childFd != childFd) {
    throw std::invalid_argument(
        folly::to<std::string>("child fd not found ", childFd));
  }
  return pos - pipes_.begin();
}

void Subprocess::closeParentFd(int childFd) {
  int idx = findByChildFd(childFd);
  pipes_[idx].pipe.close(); // May throw
  pipes_.erase(pipes_.begin() + idx);
}

std::vector<Subprocess::ChildPipe> Subprocess::takeOwnershipOfPipes() {
  std::vector<Subprocess::ChildPipe> pipes;
  for (auto& p : pipes_) {
    pipes.emplace_back(p.childFd, std::move(p.pipe));
  }
  // release memory
  std::vector<Pipe>().swap(pipes_);
  return pipes;
}

namespace {

class Initializer {
 public:
  Initializer() {
    // We like EPIPE, thanks.
    ::signal(SIGPIPE, SIG_IGN);
  }
};

Initializer initializer;

} // namespace

} // namespace folly