in util/ServerSocket.cpp [214:303]
ErrorCode ServerSocket::acceptNextConnection(int timeoutMillis,
bool tryCurAddressFirst) {
ErrorCode code = listen();
if (code != OK) {
return code;
}
WDT_CHECK(!listeningFds_.empty());
WDT_CHECK(timeoutMillis > 0);
auto port = socket_->getPort();
auto fd = socket_->getFd();
const WdtOptions &options = threadCtx_.getOptions();
const bool checkAbort = (options.abort_check_interval_millis > 0);
const int numFds = listeningFds_.size();
struct pollfd pollFds[numFds];
auto startTime = Clock::now();
while (true) {
// we need this loop because poll() can return before any file handles
// have changes or before timing out. In that case, we check whether it
// is because of EINTR or not. If true, we have to try poll with
// reduced timeout
int timeElapsed = durationMillis(Clock::now() - startTime);
if (timeElapsed >= timeoutMillis) {
WVLOG(3) << "accept() timed out";
return CONN_ERROR;
}
int pollTimeout = timeoutMillis - timeElapsed;
if (checkAbort) {
if (threadCtx_.getAbortChecker()->shouldAbort()) {
WLOG(ERROR) << "Transfer aborted during accept " << port << " " << fd;
return ABORT;
}
pollTimeout = std::min(pollTimeout, options.abort_check_interval_millis);
}
for (int i = 0; i < numFds; i++) {
pollFds[i] = {listeningFds_[i], POLLIN, 0};
}
int retValue = poll(pollFds, numFds, pollTimeout);
if (retValue > 0) {
break;
}
if (errno == EINTR) {
WVLOG(1) << "poll() call interrupted. retrying...";
continue;
}
if (retValue == 0) {
WVLOG(3) << "poll() timed out on port : " << port
<< ", listening fds : " << listeningFds_;
continue;
}
WPLOG(ERROR) << "poll() failed on port : " << port
<< ", listening fds : " << listeningFds_;
return CONN_ERROR;
}
if (lastCheckedPollIndex_ >= numFds) {
// can happen if getaddrinfo returns different set of addresses
lastCheckedPollIndex_ = 0;
} else if (!tryCurAddressFirst) {
// else try the next address
lastCheckedPollIndex_ = (lastCheckedPollIndex_ + 1) % numFds;
}
for (int count = 0; count < numFds; count++) {
auto &pollFd = pollFds[lastCheckedPollIndex_];
if (pollFd.revents & POLLIN) {
struct sockaddr_storage addr;
socklen_t addrLen = sizeof(addr);
fd = accept(pollFd.fd, (struct sockaddr *)&addr, &addrLen);
if (fd < 0) {
WPLOG(ERROR) << "accept error";
return CONN_ERROR;
}
WdtSocket::getNameInfo((struct sockaddr *)&addr, addrLen,
peerIp_, peerPort_);
WVLOG(1) << "New connection, fd : " << fd << " from " << peerIp_ << " "
<< peerPort_;
socket_->setFd(fd);
socket_->setSocketTimeouts();
socket_->setDscp(options.dscp);
return OK;
}
lastCheckedPollIndex_ = (lastCheckedPollIndex_ + 1) % numFds;
}
WLOG(ERROR) << "None of the listening fds got a POLLIN event " << port;
return CONN_ERROR;
}