uint64_t HQSession::requestStreamWriteImpl()

in proxygen/lib/http/session/HQSession.cpp [2042:2169]


uint64_t HQSession::requestStreamWriteImpl(HQStreamTransportBase* hqStream,
                                           uint64_t maxEgress,
                                           double ratio) {
  CHECK(hqStream->queueHandle_.isStreamTransportEnqueued());
  HTTPTransaction::DestructorGuard dg(&hqStream->txn_);

  auto streamId = hqStream->getStreamId();
  auto flowControl = sock_->getStreamFlowControl(streamId);
  if (flowControl.hasError()) {
    LOG(ERROR)
        << "Got error=" << flowControl.error() << " streamID=" << streamId
        << " detached=" << hqStream->detached_
        << " readBufLen=" << static_cast<int>(hqStream->readBuf_.chainLength())
        << " writeBufLen=" << static_cast<int>(hqStream->writeBufferSize())
        << " readEOF=" << hqStream->readEOF_
        << " ingressError_=" << static_cast<int>(hqStream->ingressError_)
        << " eomGate_=" << hqStream->eomGate_;
    handleWriteError(hqStream, flowControl.error());
    return 0;
  }

  auto streamSendWindow = flowControl->sendWindowAvailable;

  size_t canSend = std::min(streamSendWindow, maxEgress);

  // we may have already buffered more than the amount the transport can take,
  // or the txn may not have any more body bytes/EOM to add. In case, there is
  // no need to call txn->onWriteReady.
  if (hqStream->wantsOnWriteReady(canSend)) {
    // Populate the write buffer by telling the transaction how much
    // room is available for body data
    size_t maxBodySend = canSend - hqStream->writeBufferSize();
    VLOG(4) << __func__ << " asking txn for more bytes sess=" << *this
            << ": streamID=" << streamId << " canSend=" << canSend
            << " remain=" << hqStream->writeBufferSize()
            << " pendingEOM=" << hqStream->pendingEOM_
            << " maxBodySend=" << maxBodySend << " ratio=" << ratio;
    hqStream->txn_.onWriteReady(maxBodySend, ratio);
    // onWriteReady may not be able to detach any byte from the deferred egress
    // body bytes, in case it's getting rate limited.
    // In that case the txn will get removed from the egress queue from
    // onWriteReady
    if (!hqStream->hasWriteBuffer() && !hqStream->pendingEOM_) {
      return 0;
    }
  }

  auto bufWritter = [&](quic::StreamId streamId,
                        std::unique_ptr<folly::IOBuf> data,
                        bool sendEof,
                        quic::QuicSocket::DeliveryCallback* deliveryCallback) {
    return sock_->writeChain(
        streamId, std::move(data), sendEof, deliveryCallback);
  };
  auto bufMetaWritter =
      [&](quic::StreamId streamId,
          quic::BufferMeta bufMeta,
          bool sendEof,
          quic::QuicSocket::DeliveryCallback* deliveryCallback) {
        return sock_->writeBufMeta(
            streamId, bufMeta, sendEof, deliveryCallback);
      };

  size_t sent = 0;
  auto bufSendLen = std::min(canSend, hqStream->writeBuf_.chainLength());
  auto tryWriteBuf = hqStream->writeBuf_.splitAtMost(canSend);
  bool sendEof = (hqStream->pendingEOM_ && !hqStream->hasPendingBody());
  if (bufSendLen > 0 || sendEof) {
    VLOG(4) << __func__ << " before write sess=" << *this
            << ": streamID=" << streamId << " maxEgress=" << maxEgress
            << " sendWindow=" << streamSendWindow
            << " tryToSend=" << tryWriteBuf->computeChainDataLength()
            << " sendEof=" << sendEof;
    sent = handleWrite(std::move(bufWritter),
                       hqStream,
                       std::move(tryWriteBuf),
                       bufSendLen,
                       sendEof);
  }
  auto bufMetaWriteLen =
      std::min(canSend - bufSendLen, hqStream->bufMeta_.length);
  auto splitBufMeta = hqStream->bufMeta_.split(bufMetaWriteLen);
  // Refresh sendEof after previous write and the bufMEta split.
  sendEof = (hqStream->pendingEOM_ && !hqStream->hasPendingBody());
  if (sendEof || splitBufMeta.length > 0) {
    quic::BufferMeta quicBufMeta(splitBufMeta.length);
    sent += handleWrite(std::move(bufMetaWritter),
                        hqStream,
                        quicBufMeta,
                        quicBufMeta.length,
                        sendEof);
  }

  VLOG(4) << __func__ << " after write sess=" << *this
          << ": streamID=" << streamId << " sent=" << sent
          << " buflen=" << hqStream->writeBufferSize()
          << " hasPendingBody=" << hqStream->txn_.hasPendingBody()
          << " EOM=" << hqStream->pendingEOM_;
  if (infoCallback_) {
    infoCallback_->onWrite(*this, sent);
  }
  CHECK_GE(maxEgress, sent);

  bool flowControlBlocked = (sent == streamSendWindow && !sendEof);
  if (flowControlBlocked) {
    // TODO: this one doesn't create trouble, but it's certainly not logging the
    // extra params anyway.
    if (sock_ && sock_->getState() && sock_->getState()->qLogger) {
      sock_->getState()->qLogger->addStreamStateUpdate(
          streamId,
          quic::kStreamBlocked,
          std::chrono::duration_cast<std::chrono::milliseconds>(
              std::chrono::steady_clock::now() - hqStream->createdTime));
    }
  }
  // sendAbort can clear the egress queue, so this stream may no longer be
  // enqueued
  if (hqStream->queueHandle_.isStreamTransportEnqueued() &&
      (!hqStream->hasPendingEgress() || flowControlBlocked)) {
    VLOG(4) << "clearPendingEgress for " << hqStream->txn_;
    txnEgressQueue_.clearPendingEgress(hqStream->queueHandle_.getHandle());
  }
  if (flowControlBlocked && !hqStream->txn_.isEgressComplete()) {
    VLOG(4) << __func__ << " txn flow control blocked, txn=" << hqStream->txn_;
    hqStream->txn_.pauseEgress();
  }
  return sent;
}