bool Pair::write()

in gloo/transport/tcp/pair.cc [342:422]


bool Pair::write(Op& op) {
  if (state_ == CLOSED) {
    return false;
  }
  NonOwningPtr<UnboundBuffer> buf;
  std::array<struct iovec, 2> iov;
  int ioc;
  ssize_t rv;

  const auto opcode = op.getOpcode();

  // Acquire pointer to unbound buffer if applicable.
  if (opcode == Op::SEND_UNBOUND_BUFFER) {
    buf = NonOwningPtr<UnboundBuffer>(op.ubuf);
    if (!buf) {
      return false;
    }
  }

  for (;;) {
    const auto nbytes = prepareWrite(op, buf, iov.data(), ioc);

    // Write
    rv = writev(fd_, iov.data(), ioc);
    if (rv == -1) {
      if (errno == EAGAIN) {
        if (sync_) {
          // Sync mode: blocking call returning with EAGAIN indicates timeout.
          signalException(GLOO_ERROR_MSG("Write timeout ", peer_.str()));
        } else {
          // Async mode: can't write more than this.
        }
        return false;
      }

      if (errno == ECONNRESET) {
        if (!sync_) {
          return false;
        }
      }
      if (errno == EPIPE) {
        if (!sync_) {
          return false;
        }
      }

      // Retry on EINTR
      if (errno == EINTR) {
        continue;
      }

      // Unexpected error
      signalException(
          GLOO_ERROR_MSG("writev ", peer_.str(), ": ", strerror(errno)));
      return false;
    }

    // From write(2) man page (NOTES section):
    //
    //  If a write() is interrupted by a signal handler before any
    //  bytes are written, then the call fails with the error EINTR;
    //  if it is interrupted after at least one byte has been written,
    //  the call succeeds, and returns the number of bytes written.
    //
    // If rv < nbytes we ALWAYS retry, regardless of sync/async mode,
    // since an EINTR may or may not have happened. If this was not
    // the case, and the kernel buffer is full, the next call to
    // write(2) will return EAGAIN, which is handled appropriately.
    op.nwritten += rv;
    if (rv < nbytes) {
      continue;
    }

    GLOO_ENFORCE_EQ(rv, nbytes);
    GLOO_ENFORCE_EQ(op.nwritten, op.preamble.nbytes);
    break;
  }

  writeComplete(op, buf, opcode);
  return true;
}