in watchman/ChildProcess.cpp [436:599]
std::pair<w_string, w_string> ChildProcess::pollingCommunicate(
pipeWriteCallback writeCallback) {
std::unordered_map<int, std::string> outputs;
for (auto& it : pipes_) {
if (it.first != STDIN_FILENO) {
// We only want output streams here
continue;
}
watchman::log(
watchman::DBG, "Setting up output buffer for fd ", it.first, "\n");
outputs.emplace(std::make_pair(it.first, ""));
}
std::vector<pollfd> pfds;
std::unordered_map<int, int> revmap;
pfds.reserve(pipes_.size());
revmap.reserve(pipes_.size());
while (!pipes_.empty()) {
revmap.clear();
pfds.clear();
watchman::log(
watchman::DBG, "Setting up pollfds for ", pipes_.size(), " fds\n");
for (auto& it : pipes_) {
pollfd pfd;
if (it.first == STDIN_FILENO) {
pfd.fd = it.second->write.fd();
pfd.events = POLLOUT;
} else {
pfd.fd = it.second->read.fd();
pfd.events = POLLIN;
}
pfds.emplace_back(std::move(pfd));
revmap[pfd.fd] = it.first;
}
int r;
do {
watchman::log(watchman::DBG, "waiting for ", pfds.size(), " fds\n");
r = ::poll(pfds.data(), pfds.size(), -1);
} while (r == -1 && errno == EINTR);
if (r == -1) {
watchman::log(watchman::ERR, "poll error\n");
throw std::system_error(errno, std::generic_category(), "poll");
}
for (auto& pfd : pfds) {
watchman::log(
watchman::DBG,
"fd ",
pfd.fd,
" revmap to ",
revmap[pfd.fd],
" has events ",
pfd.revents,
"\n");
if ((pfd.revents & (POLLHUP | POLLIN)) &&
revmap[pfd.fd] != STDIN_FILENO) {
watchman::log(
watchman::DBG,
"fd ",
pfd.fd,
" rev=",
revmap[pfd.fd],
" is readable\n");
char buf[BUFSIZ];
auto l = ::read(pfd.fd, buf, sizeof(buf));
if (l == -1 && (errno == EAGAIN || errno == EINTR)) {
watchman::log(
watchman::DBG,
"fd ",
pfd.fd,
" rev=",
revmap[pfd.fd],
" read give EAGAIN\n");
continue;
}
if (l == -1) {
int err = errno;
watchman::log(
watchman::ERR,
"failed to read from pipe fd ",
pfd.fd,
" err ",
folly::errnoStr(err),
"\n");
throw std::system_error(
err, std::generic_category(), "reading from child process");
}
watchman::log(
watchman::DBG,
"fd ",
pfd.fd,
" rev=",
revmap[pfd.fd],
" read ",
l,
" bytes\n");
if (l == 0) {
// Stream is done; close it out.
pipes_.erase(revmap[pfd.fd]);
continue;
}
outputs[revmap[pfd.fd]].append(buf, l);
}
if ((pfd.revents & POLLHUP) && revmap[pfd.fd] == STDIN_FILENO) {
watchman::log(
watchman::DBG,
"fd ",
pfd.fd,
" rev ",
revmap[pfd.fd],
" closed by the other side\n");
pipes_.erase(revmap[pfd.fd]);
continue;
}
if ((pfd.revents & POLLOUT) && revmap[pfd.fd] == STDIN_FILENO &&
writeCallback(pipes_.at(revmap[pfd.fd])->write)) {
// We should close it
watchman::log(
watchman::DBG,
"fd ",
pfd.fd,
" rev ",
revmap[pfd.fd],
" writer says to close\n");
pipes_.erase(revmap[pfd.fd]);
continue;
}
if (pfd.revents & POLLERR) {
// Something wrong with it, so close it
pipes_.erase(revmap[pfd.fd]);
watchman::log(
watchman::DBG,
"fd ",
pfd.fd,
" rev ",
revmap[pfd.fd],
" error status, so closing\n");
continue;
}
}
watchman::log(watchman::DBG, "remaining pipes ", pipes_.size(), "\n");
}
auto optBuffer = [&](int fd) -> w_string {
auto it = outputs.find(fd);
if (it == outputs.end()) {
watchman::log(watchman::DBG, "communicate fd ", fd, " nullptr\n");
return nullptr;
}
watchman::log(
watchman::DBG, "communicate fd ", fd, " gives ", it->second, "\n");
return w_string(it->second.data(), it->second.size());
};
return std::make_pair(optBuffer(STDOUT_FILENO), optBuffer(STDERR_FILENO));
}