void HTTPSession::runLoopCallback()

in proxygen/lib/http/session/HTTPSession.cpp [2152:2245]


void HTTPSession::runLoopCallback() noexcept {
  // We schedule this callback to run at the end of an event
  // loop iteration if either of two conditions has happened:
  //   * The session has generated some egress data (see scheduleWrite())
  //   * Reads have become unpaused (see resumeReads())
  DestructorGuard dg(this);
  inLoopCallback_ = true;
  auto scopeg = folly::makeGuard([this] {
    inLoopCallback_ = false;
    // This ScopeGuard needs to be under the above DestructorGuard
    updatePendingWrites();
    if (!hasMoreWrites()) {
      invokeOnAllTransactions([](HTTPTransaction* txn) {
        txn->checkIfEgressRateLimitedByUpstream();
      });
    }
    checkForShutdown();
  });
  VLOG(5) << *this << " in loop callback";

  for (uint32_t i = 0; i < kMaxWritesPerLoop; ++i) {
    bodyBytesPerWriteBuf_ = 0;
    if (isPrioritySampled()) {
      invokeOnAllTransactions([this](HTTPTransaction* txn) {
        txn->updateContentionsCount(txnEgressQueue_.numPendingEgress());
      });
    }

    bool cork = true;
    bool timestampTx = false;
    bool timestampAck = false;
    unique_ptr<IOBuf> writeBuf =
        getNextToSend(&cork, &timestampTx, &timestampAck);

    if (!writeBuf) {
      break;
    }
    uint64_t len = writeBuf->computeChainDataLength();
    VLOG(11) << *this << " bytes of egress to be written: " << len
             << " cork:" << cork << " timestampTx:" << timestampTx
             << " timestampAck:" << timestampAck;
    if (len == 0) {
      checkForShutdown();
      return;
    }

    if (isPrioritySampled()) {
      invokeOnAllTransactions([this](HTTPTransaction* txn) {
        txn->updateSessionBytesSheduled(bodyBytesPerWriteBuf_);
      });
    }

    folly::WriteFlags flags = folly::WriteFlags::NONE;
    flags |= (cork) ? folly::WriteFlags::CORK : folly::WriteFlags::NONE;
    flags |= (timestampTx) ? folly::WriteFlags::TIMESTAMP_TX
                           : folly::WriteFlags::NONE;
    flags |= (timestampAck) ? folly::WriteFlags::EOR : folly::WriteFlags::NONE;
    CHECK(!pendingWrite_.hasValue());
    pendingWrite_.emplace(len, DestructorGuard(this));

    if (!writeTimeout_.isScheduled()) {
      // Any performance concern here?
      wheelTimer_.scheduleTimeout(&writeTimeout_);
    }
    numActiveWrites_++;
    VLOG(4) << *this << " writing " << len
            << ", activeWrites=" << numActiveWrites_ << " cork:" << cork
            << " timestampTx:" << timestampTx
            << " timestampAck:" << timestampAck;
    bytesScheduled_ += len;
    sock_->writeChain(this, std::move(writeBuf), flags);
    if (numActiveWrites_ > 0) {
      updateWriteCount();
      HTTPSessionBase::notifyEgressBodyBuffered(len, false);
      // updateWriteBufSize called in scope guard
      break;
    }
    // writeChain can result in a writeError and trigger the shutdown code path
  }
  if (numActiveWrites_ == 0 && !writesShutdown() && hasMoreWrites() &&
      (!connFlowControl_ || connFlowControl_->getAvailableSend())) {
    scheduleWrite();
  }

  if (readsUnpaused()) {
    processReadData();

    // Install the read callback if necessary
    if (readsUnpaused() && !sock_->getReadCallback()) {
      sock_->setReadCB(this);
    }
  }
  // checkForShutdown is now in ScopeGuard
}