void AsyncMcClientImpl::pushMessages()

in mcrouter/lib/network/AsyncMcClientImpl.cpp [228:323]


void AsyncMcClientImpl::pushMessages() {
  DestructorGuard dg(this);

  assert(connectionState_ == ConnectionState::Up);
  auto numToSend = getNumToSend();
  // Call batch status callback
  if (requestStatusCallbacks_.onWrite && numToSend > 0) {
    requestStatusCallbacks_.onWrite(numToSend);
  }

  std::array<struct iovec, kStackIovecs> iovecs;
  size_t iovsUsed = 0;
  size_t batchSize = 0;
  McClientRequestContextBase* tail = nullptr;

  auto sendBatchFun = [this](
                          McClientRequestContextBase* tailReq,
                          const struct iovec* iov,
                          size_t iovCnt,
                          bool last) {
    tailReq->isBatchTail = true;
    socket_->writev(
        this,
        iov,
        iovCnt,
        last ? folly::WriteFlags::NONE : folly::WriteFlags::CORK);
    return connectionState_ == ConnectionState::Up;
  };

  while (queue_.getPendingRequestCount() != 0 && numToSend > 0 &&
         /* we might be already not UP, because of failed writev */
         connectionState_ == ConnectionState::Up) {
    auto& req = queue_.peekNextPending();

    auto iov = req.reqContext.getIovs();
    auto iovcnt = req.reqContext.getIovsCount();
    if (debugFifo_.isConnected()) {
      debugFifo_.startMessage(MessageDirection::Sent, req.reqContext.typeId());
      debugFifo_.writeData(iov, iovcnt);
    }

    if (iovsUsed + iovcnt > kStackIovecs && iovsUsed) {
      // We're out of inline iovecs, flush what we batched.
      if (!sendBatchFun(tail, iovecs.data(), iovsUsed, false)) {
        break;
      }
      iovsUsed = 0;
      batchSize = 0;
    }

    if (iovcnt >= kStackIovecs || (iovsUsed == 0 && numToSend == 1)) {
      // Req is either too big to batch or it's the last one, so just send it
      // alone.
      queue_.markNextAsSending();
      sendBatchFun(&req, iov, iovcnt, numToSend == 1);
    } else {
      auto size = calculateIovecsTotalSize(iov, iovcnt);

      if (size + batchSize > kMaxBatchSize && iovsUsed) {
        // We already accumulated too much data, flush what we have.
        if (!sendBatchFun(tail, iovecs.data(), iovsUsed, false)) {
          break;
        }
        iovsUsed = 0;
        batchSize = 0;
      }

      queue_.markNextAsSending();
      if (size >= kMaxBatchSize || (iovsUsed == 0 && numToSend == 1)) {
        // Req is either too big to batch or it's the last one, so just send it
        // alone.
        sendBatchFun(&req, iov, iovcnt, numToSend == 1);
      } else {
        memcpy(iovecs.data() + iovsUsed, iov, sizeof(struct iovec) * iovcnt);
        iovsUsed += iovcnt;
        batchSize += size;
        tail = &req;

        if (numToSend == 1) {
          // This was the last request flush everything.
          sendBatchFun(tail, iovecs.data(), iovsUsed, true);
        }
      }
    }

    --numToSend;
  }
  if (connectionState_ == ConnectionState::Up && pendingGoAwayReply_) {
    // Note: we're not waiting for all requests to be sent, since that may take
    // a while and if we didn't succeed in one loop, this means that we're
    // already backlogged.
    sendGoAwayReply();
  }
  pendingGoAwayReply_ = false;
  scheduleNextWriterLoop();
}