ErrorCode ServerSocket::acceptNextConnection()

in util/ServerSocket.cpp [214:303]


ErrorCode ServerSocket::acceptNextConnection(int timeoutMillis,
                                             bool tryCurAddressFirst) {
  ErrorCode code = listen();
  if (code != OK) {
    return code;
  }
  WDT_CHECK(!listeningFds_.empty());
  WDT_CHECK(timeoutMillis > 0);

  auto port = socket_->getPort();
  auto fd = socket_->getFd();
  const WdtOptions &options = threadCtx_.getOptions();
  const bool checkAbort = (options.abort_check_interval_millis > 0);

  const int numFds = listeningFds_.size();
  struct pollfd pollFds[numFds];
  auto startTime = Clock::now();
  while (true) {
    // we need this loop because poll() can return before any file handles
    // have changes or before timing out. In that case, we check whether it
    // is because of EINTR or not. If true, we have to try poll with
    // reduced timeout
    int timeElapsed = durationMillis(Clock::now() - startTime);
    if (timeElapsed >= timeoutMillis) {
      WVLOG(3) << "accept() timed out";
      return CONN_ERROR;
    }
    int pollTimeout = timeoutMillis - timeElapsed;
    if (checkAbort) {
      if (threadCtx_.getAbortChecker()->shouldAbort()) {
        WLOG(ERROR) << "Transfer aborted during accept " << port << " " << fd;
        return ABORT;
      }
      pollTimeout = std::min(pollTimeout, options.abort_check_interval_millis);
    }
    for (int i = 0; i < numFds; i++) {
      pollFds[i] = {listeningFds_[i], POLLIN, 0};
    }

    int retValue = poll(pollFds, numFds, pollTimeout);
    if (retValue > 0) {
      break;
    }
    if (errno == EINTR) {
      WVLOG(1) << "poll() call interrupted. retrying...";
      continue;
    }
    if (retValue == 0) {
      WVLOG(3) << "poll() timed out on port : " << port
               << ", listening fds : " << listeningFds_;
      continue;
    }
    WPLOG(ERROR) << "poll() failed on port : " << port
                 << ", listening fds : " << listeningFds_;
    return CONN_ERROR;
  }

  if (lastCheckedPollIndex_ >= numFds) {
    // can happen if getaddrinfo returns different set of addresses
    lastCheckedPollIndex_ = 0;
  } else if (!tryCurAddressFirst) {
    // else try the next address
    lastCheckedPollIndex_ = (lastCheckedPollIndex_ + 1) % numFds;
  }

  for (int count = 0; count < numFds; count++) {
    auto &pollFd = pollFds[lastCheckedPollIndex_];
    if (pollFd.revents & POLLIN) {
      struct sockaddr_storage addr;
      socklen_t addrLen = sizeof(addr);
      fd = accept(pollFd.fd, (struct sockaddr *)&addr, &addrLen);
      if (fd < 0) {
        WPLOG(ERROR) << "accept error";
        return CONN_ERROR;
      }
      WdtSocket::getNameInfo((struct sockaddr *)&addr, addrLen,
                             peerIp_, peerPort_);
      WVLOG(1) << "New connection, fd : " << fd << " from " << peerIp_ << " "
               << peerPort_;
      socket_->setFd(fd);
      socket_->setSocketTimeouts();
      socket_->setDscp(options.dscp);

      return OK;
    }
    lastCheckedPollIndex_ = (lastCheckedPollIndex_ + 1) % numFds;
  }
  WLOG(ERROR) << "None of the listening fds got a POLLIN event " << port;
  return CONN_ERROR;
}