void AsyncSocket::writeImpl()

in folly/io/async/AsyncSocket.cpp [1621:1766]


void AsyncSocket::writeImpl(
    WriteCallback* callback,
    const iovec* vec,
    size_t count,
    unique_ptr<IOBuf>&& buf,
    size_t totalBytes,
    WriteFlags flags) {
  VLOG(6) << "AsyncSocket::writev() this=" << this << ", fd=" << fd_
          << ", callback=" << callback << ", count=" << count
          << ", state=" << state_;
  DestructorGuard dg(this);
  unique_ptr<IOBuf> ioBuf(std::move(buf));
  eventBase_->dcheckIsInEventBaseThread();

  auto* releaseIOBufCallback =
      callback ? callback->getReleaseIOBufCallback() : nullptr;

  SCOPE_EXIT { releaseIOBuf(std::move(ioBuf), releaseIOBufCallback); };

  totalAppBytesScheduledForWrite_ += totalBytes;
  if (ioBuf) {
    allocatedBytesBuffered_ += ioBuf->computeChainCapacity();
  }

  if (shutdownFlags_ & (SHUT_WRITE | SHUT_WRITE_PENDING)) {
    // No new writes may be performed after the write side of the socket has
    // been shutdown.
    //
    // We could just call callback->writeError() here to fail just this write.
    // However, fail hard and use invalidState() to fail all outstanding
    // callbacks and move the socket into the error state.  There's most likely
    // a bug in the caller's code, so we abort everything rather than trying to
    // proceed as best we can.
    return invalidState(callback);
  }

  uint32_t countWritten = 0;
  uint32_t partialWritten = 0;
  ssize_t bytesWritten = 0;
  bool mustRegister = false;
  if ((state_ == StateEnum::ESTABLISHED || state_ == StateEnum::FAST_OPEN) &&
      !connecting()) {
    if (writeReqHead_ == nullptr) {
      // If we are established and there are no other writes pending,
      // we can attempt to perform the write immediately.
      assert(writeReqTail_ == nullptr);
      assert((eventFlags_ & EventHandler::WRITE) == 0);

      auto writeResult = performWrite(
          vec, uint32_t(count), flags, &countWritten, &partialWritten);
      bytesWritten = writeResult.writeReturn;
      if (bytesWritten < 0) {
        auto errnoCopy = errno;
        if (writeResult.exception) {
          return failWrite(__func__, callback, 0, *writeResult.exception);
        }
        AsyncSocketException ex(
            AsyncSocketException::INTERNAL_ERROR,
            withAddr("writev failed"),
            errnoCopy);
        return failWrite(__func__, callback, 0, ex);
      } else if (countWritten == count) {
        // done, add the whole buffer
        if (countWritten && isZeroCopyRequest(flags)) {
          addZeroCopyBuf(std::move(ioBuf), releaseIOBufCallback);
        } else {
          releaseIOBuf(std::move(ioBuf), releaseIOBufCallback);
        }

        // We successfully wrote everything.
        // Invoke the callback and return.
        if (callback) {
          callback->writeSuccess();
        }
        return;
      } else { // continue writing the next writeReq
        // add just the ptr
        if (bytesWritten && isZeroCopyRequest(flags)) {
          addZeroCopyBuf(ioBuf.get());
        }
      }
      if (!connecting()) {
        // Writes might put the socket back into connecting state
        // if TFO is enabled, and using TFO fails.
        // This means that write timeouts would not be active, however
        // connect timeouts would affect this stage.
        mustRegister = true;
      }
    }
  } else if (!connecting()) {
    // Invalid state for writing
    return invalidState(callback);
  }

  // Create a new WriteRequest to add to the queue
  WriteRequest* req;
  try {
    req = BytesWriteRequest::newRequest(
        this,
        callback,
        vec + countWritten,
        uint32_t(count - countWritten),
        partialWritten,
        uint32_t(bytesWritten),
        std::move(ioBuf),
        flags);
  } catch (const std::exception& ex) {
    // we mainly expect to catch std::bad_alloc here
    AsyncSocketException tex(
        AsyncSocketException::INTERNAL_ERROR,
        withAddr(string("failed to append new WriteRequest: ") + ex.what()));
    return failWrite(__func__, callback, size_t(bytesWritten), tex);
  }
  req->consume();
  if (writeReqTail_ == nullptr) {
    assert(writeReqHead_ == nullptr);
    writeReqHead_ = writeReqTail_ = req;
  } else {
    writeReqTail_->append(req);
    writeReqTail_ = req;
  }

  if (bufferCallback_) {
    bufferCallback_->onEgressBuffered();
  }

  // Register for write events if are established and not currently
  // waiting on write events
  if (mustRegister) {
    assert(state_ == StateEnum::ESTABLISHED);
    assert((eventFlags_ & EventHandler::WRITE) == 0);
    if (!updateEventRegistration(EventHandler::WRITE, 0)) {
      assert(state_ == StateEnum::ERROR);
      return;
    }
    if (sendTimeout_ > 0) {
      // Schedule a timeout to fire if the write takes too long.
      if (!writeTimeout_.scheduleTimeout(sendTimeout_)) {
        AsyncSocketException ex(
            AsyncSocketException::INTERNAL_ERROR,
            withAddr("failed to schedule send timeout"));
        return failWrite(__func__, ex);
      }
    }
  }
}