int64_t WdtSocket::ioWithAbortCheck()

in util/WdtSocket.cpp [537:588]


int64_t WdtSocket::ioWithAbortCheck(F readOrWrite, T tbuf, int64_t numBytes,
                                    int timeoutMs, bool tryFull) {
  WDT_CHECK(threadCtx_.getAbortChecker() != nullptr)
      << "abort checker can not be null";
  bool checkAbort = (threadCtx_.getOptions().abort_check_interval_millis > 0);
  auto startTime = Clock::now();
  int64_t doneBytes = 0;
  int retries = 0;
  while (doneBytes < numBytes) {
    const int64_t ret =
        readOrWrite(fd_, tbuf + doneBytes, numBytes - doneBytes);
    if (ret < 0) {
      // error
      if (errno != EINTR && errno != EAGAIN) {
        WPLOG(ERROR) << "non-retryable error encountered during socket io "
                     << fd_ << " " << doneBytes << " " << retries;
        return (doneBytes > 0 ? doneBytes : ret);
      }
    } else if (ret == 0) {
      // eof
      WVLOG(1) << "EOF received during socket io. fd : " << fd_
               << ", finished bytes : " << doneBytes
               << ", retries : " << retries;
      return doneBytes;
    } else {
      // success
      doneBytes += ret;
      if (!tryFull) {
        // do not have to read/write entire data
        return doneBytes;
      }
    }
    if (checkAbort && threadCtx_.getAbortChecker()->shouldAbort()) {
      WLOG(ERROR) << "transfer aborted during socket io " << fd_ << " "
                  << doneBytes << " " << retries;
      return (doneBytes > 0 ? doneBytes : -1);
    }
    if (timeoutMs > 0) {
      int duration = durationMillis(Clock::now() - startTime);
      if (duration >= timeoutMs) {
        WLOG(INFO) << "socket io timed out after " << duration
                   << " ms, retries " << retries << " fd " << fd_
                   << " doneBytes " << doneBytes;
        return (doneBytes > 0 ? doneBytes : -1);
      }
    }
    retries++;
  }
  WVLOG_IF(1, retries > 1) << "socket io for " << doneBytes << " bytes took "
                           << retries << " retries";
  return doneBytes;
}