unique_ptr HTTPSession::getNextToSend()

in proxygen/lib/http/session/HTTPSession.cpp [2055:2150]


unique_ptr<IOBuf> HTTPSession::getNextToSend(bool* cork,
                                             bool* timestampTx,
                                             bool* timestampAck) {
  // limit ourselves to one outstanding write at a time (onWriteSuccess calls
  // scheduleWrite)
  if (numActiveWrites_ > 0 || writesShutdown()) {
    VLOG(4) << "skipping write during this loop, numActiveWrites_="
            << numActiveWrites_ << " writesShutdown()=" << writesShutdown();
    return nullptr;
  }

  // We always tack on at least one body packet to the current write buf
  // This ensures that a short HTTPS response will go out in a single SSL record
  while (!txnEgressQueue_.empty()) {
    uint32_t toSend = kWriteReadyMax;
    if (connFlowControl_) {
      if (connFlowControl_->getAvailableSend() == 0) {
        VLOG(4) << "Session-level send window is full, skipping remaining "
                << "body writes this loop";
        break;
      }
      toSend = std::min(toSend, connFlowControl_->getAvailableSend());
    }
    txnEgressQueue_.nextEgress(nextEgressResults_, false);
    CHECK(!nextEgressResults_.empty()); // Queue was non empty, so this must be
    // The maximum we will send for any transaction in this loop
    uint32_t txnMaxToSend = toSend * nextEgressResults_.front().second;
    if (txnMaxToSend == 0) {
      // toSend is smaller than the number of transactions.  Give all egress
      // to the first transaction
      nextEgressResults_.erase(++nextEgressResults_.begin(),
                               nextEgressResults_.end());
      txnMaxToSend = std::min(toSend, egressBodySizeLimit_);
      nextEgressResults_.front().second = 1;
    }
    if (nextEgressResults_.size() > 1 && txnMaxToSend > egressBodySizeLimit_) {
      // Cap the max to egressBodySizeLimit_, and recompute toSend accordingly
      txnMaxToSend = egressBodySizeLimit_;
      toSend = txnMaxToSend / nextEgressResults_.front().second;
    }
    // split allowed by relative weight, with some minimum
    for (auto txnPair : nextEgressResults_) {
      uint32_t txnAllowed = txnPair.second * toSend;
      if (nextEgressResults_.size() > 1) {
        CHECK_LE(txnAllowed, egressBodySizeLimit_);
      }
      if (connFlowControl_) {
        CHECK_LE(txnAllowed, connFlowControl_->getAvailableSend());
      }
      if (txnAllowed == 0) {
        // The ratio * toSend was so small this txn gets nothing.
        VLOG(4) << *this << " breaking egress loop on 0 txnAllowed";
        break;
      }

      VLOG(4) << *this << " egressing txnID=" << txnPair.first->getID()
              << " allowed=" << txnAllowed;
      txnPair.first->onWriteReady(txnAllowed, txnPair.second);
    }
    nextEgressResults_.clear();
    // it can be empty because of HTTPTransaction rate limiting.  We should
    // change rate limiting to clearPendingEgress while waiting.
    if (!writeBuf_.empty()) {
      break;
    }
  }
  *timestampTx = false;
  *timestampAck = false;
  if (byteEventTracker_) {
    uint64_t needed = byteEventTracker_->preSend(
        cork, timestampTx, timestampAck, bytesWritten_);
    if (needed > 0) {
      VLOG(5) << *this
              << " writeBuf_.chainLength(): " << writeBuf_.chainLength()
              << " txnEgressQueue_.empty(): " << txnEgressQueue_.empty();

      if (needed < writeBuf_.chainLength()) {
        // split the next SOM / EOM chunk
        VLOG(5) << *this << " splitting " << needed << " bytes out of a "
                << writeBuf_.chainLength() << " bytes IOBuf";
        *cork = true;
        if (sessionStats_) {
          sessionStats_->recordPresendIOSplit();
        }
        writeBufSplit_ = true;
        return writeBuf_.split(needed);
      } else {
        CHECK_EQ(needed, writeBuf_.chainLength());
      }
    }
  }

  // cork if there are txns with pending egress and room to send them
  *cork = !txnEgressQueue_.empty() && !isConnWindowFull();
  return writeBuf_.move();
}