std::pair ChildProcess::pollingCommunicate()

in watchman/ChildProcess.cpp [436:599]


std::pair<w_string, w_string> ChildProcess::pollingCommunicate(
    pipeWriteCallback writeCallback) {
  std::unordered_map<int, std::string> outputs;

  for (auto& it : pipes_) {
    if (it.first != STDIN_FILENO) {
      // We only want output streams here
      continue;
    }
    watchman::log(
        watchman::DBG, "Setting up output buffer for fd ", it.first, "\n");
    outputs.emplace(std::make_pair(it.first, ""));
  }

  std::vector<pollfd> pfds;
  std::unordered_map<int, int> revmap;
  pfds.reserve(pipes_.size());
  revmap.reserve(pipes_.size());

  while (!pipes_.empty()) {
    revmap.clear();
    pfds.clear();

    watchman::log(
        watchman::DBG, "Setting up pollfds for ", pipes_.size(), " fds\n");

    for (auto& it : pipes_) {
      pollfd pfd;
      if (it.first == STDIN_FILENO) {
        pfd.fd = it.second->write.fd();
        pfd.events = POLLOUT;
      } else {
        pfd.fd = it.second->read.fd();
        pfd.events = POLLIN;
      }
      pfds.emplace_back(std::move(pfd));
      revmap[pfd.fd] = it.first;
    }

    int r;
    do {
      watchman::log(watchman::DBG, "waiting for ", pfds.size(), " fds\n");
      r = ::poll(pfds.data(), pfds.size(), -1);
    } while (r == -1 && errno == EINTR);
    if (r == -1) {
      watchman::log(watchman::ERR, "poll error\n");
      throw std::system_error(errno, std::generic_category(), "poll");
    }

    for (auto& pfd : pfds) {
      watchman::log(
          watchman::DBG,
          "fd ",
          pfd.fd,
          " revmap to ",
          revmap[pfd.fd],
          " has events ",
          pfd.revents,
          "\n");
      if ((pfd.revents & (POLLHUP | POLLIN)) &&
          revmap[pfd.fd] != STDIN_FILENO) {
        watchman::log(
            watchman::DBG,
            "fd ",
            pfd.fd,
            " rev=",
            revmap[pfd.fd],
            " is readable\n");
        char buf[BUFSIZ];
        auto l = ::read(pfd.fd, buf, sizeof(buf));
        if (l == -1 && (errno == EAGAIN || errno == EINTR)) {
          watchman::log(
              watchman::DBG,
              "fd ",
              pfd.fd,
              " rev=",
              revmap[pfd.fd],
              " read give EAGAIN\n");
          continue;
        }
        if (l == -1) {
          int err = errno;
          watchman::log(
              watchman::ERR,
              "failed to read from pipe fd ",
              pfd.fd,
              " err ",
              folly::errnoStr(err),
              "\n");
          throw std::system_error(
              err, std::generic_category(), "reading from child process");
        }
        watchman::log(
            watchman::DBG,
            "fd ",
            pfd.fd,
            " rev=",
            revmap[pfd.fd],
            " read ",
            l,
            " bytes\n");
        if (l == 0) {
          // Stream is done; close it out.
          pipes_.erase(revmap[pfd.fd]);
          continue;
        }
        outputs[revmap[pfd.fd]].append(buf, l);
      }

      if ((pfd.revents & POLLHUP) && revmap[pfd.fd] == STDIN_FILENO) {
        watchman::log(
            watchman::DBG,
            "fd ",
            pfd.fd,
            " rev ",
            revmap[pfd.fd],
            " closed by the other side\n");
        pipes_.erase(revmap[pfd.fd]);
        continue;
      }
      if ((pfd.revents & POLLOUT) && revmap[pfd.fd] == STDIN_FILENO &&
          writeCallback(pipes_.at(revmap[pfd.fd])->write)) {
        // We should close it
        watchman::log(
            watchman::DBG,
            "fd ",
            pfd.fd,
            " rev ",
            revmap[pfd.fd],
            " writer says to close\n");
        pipes_.erase(revmap[pfd.fd]);
        continue;
      }

      if (pfd.revents & POLLERR) {
        // Something wrong with it, so close it
        pipes_.erase(revmap[pfd.fd]);
        watchman::log(
            watchman::DBG,
            "fd ",
            pfd.fd,
            " rev ",
            revmap[pfd.fd],
            " error status, so closing\n");
        continue;
      }
    }

    watchman::log(watchman::DBG, "remaining pipes ", pipes_.size(), "\n");
  }

  auto optBuffer = [&](int fd) -> w_string {
    auto it = outputs.find(fd);
    if (it == outputs.end()) {
      watchman::log(watchman::DBG, "communicate fd ", fd, " nullptr\n");
      return nullptr;
    }
    watchman::log(
        watchman::DBG, "communicate fd ", fd, " gives ", it->second, "\n");
    return w_string(it->second.data(), it->second.size());
  };

  return std::make_pair(optBuffer(STDOUT_FILENO), optBuffer(STDERR_FILENO));
}