in util/WdtSocket.cpp [537:588]
int64_t WdtSocket::ioWithAbortCheck(F readOrWrite, T tbuf, int64_t numBytes,
int timeoutMs, bool tryFull) {
WDT_CHECK(threadCtx_.getAbortChecker() != nullptr)
<< "abort checker can not be null";
bool checkAbort = (threadCtx_.getOptions().abort_check_interval_millis > 0);
auto startTime = Clock::now();
int64_t doneBytes = 0;
int retries = 0;
while (doneBytes < numBytes) {
const int64_t ret =
readOrWrite(fd_, tbuf + doneBytes, numBytes - doneBytes);
if (ret < 0) {
// error
if (errno != EINTR && errno != EAGAIN) {
WPLOG(ERROR) << "non-retryable error encountered during socket io "
<< fd_ << " " << doneBytes << " " << retries;
return (doneBytes > 0 ? doneBytes : ret);
}
} else if (ret == 0) {
// eof
WVLOG(1) << "EOF received during socket io. fd : " << fd_
<< ", finished bytes : " << doneBytes
<< ", retries : " << retries;
return doneBytes;
} else {
// success
doneBytes += ret;
if (!tryFull) {
// do not have to read/write entire data
return doneBytes;
}
}
if (checkAbort && threadCtx_.getAbortChecker()->shouldAbort()) {
WLOG(ERROR) << "transfer aborted during socket io " << fd_ << " "
<< doneBytes << " " << retries;
return (doneBytes > 0 ? doneBytes : -1);
}
if (timeoutMs > 0) {
int duration = durationMillis(Clock::now() - startTime);
if (duration >= timeoutMs) {
WLOG(INFO) << "socket io timed out after " << duration
<< " ms, retries " << retries << " fd " << fd_
<< " doneBytes " << doneBytes;
return (doneBytes > 0 ? doneBytes : -1);
}
}
retries++;
}
WVLOG_IF(1, retries > 1) << "socket io for " << doneBytes << " bytes took "
<< retries << " retries";
return doneBytes;
}