in mcrouter/lib/network/AsyncMcClientImpl.cpp [228:323]
void AsyncMcClientImpl::pushMessages() {
DestructorGuard dg(this);
assert(connectionState_ == ConnectionState::Up);
auto numToSend = getNumToSend();
// Call batch status callback
if (requestStatusCallbacks_.onWrite && numToSend > 0) {
requestStatusCallbacks_.onWrite(numToSend);
}
std::array<struct iovec, kStackIovecs> iovecs;
size_t iovsUsed = 0;
size_t batchSize = 0;
McClientRequestContextBase* tail = nullptr;
auto sendBatchFun = [this](
McClientRequestContextBase* tailReq,
const struct iovec* iov,
size_t iovCnt,
bool last) {
tailReq->isBatchTail = true;
socket_->writev(
this,
iov,
iovCnt,
last ? folly::WriteFlags::NONE : folly::WriteFlags::CORK);
return connectionState_ == ConnectionState::Up;
};
while (queue_.getPendingRequestCount() != 0 && numToSend > 0 &&
/* we might be already not UP, because of failed writev */
connectionState_ == ConnectionState::Up) {
auto& req = queue_.peekNextPending();
auto iov = req.reqContext.getIovs();
auto iovcnt = req.reqContext.getIovsCount();
if (debugFifo_.isConnected()) {
debugFifo_.startMessage(MessageDirection::Sent, req.reqContext.typeId());
debugFifo_.writeData(iov, iovcnt);
}
if (iovsUsed + iovcnt > kStackIovecs && iovsUsed) {
// We're out of inline iovecs, flush what we batched.
if (!sendBatchFun(tail, iovecs.data(), iovsUsed, false)) {
break;
}
iovsUsed = 0;
batchSize = 0;
}
if (iovcnt >= kStackIovecs || (iovsUsed == 0 && numToSend == 1)) {
// Req is either too big to batch or it's the last one, so just send it
// alone.
queue_.markNextAsSending();
sendBatchFun(&req, iov, iovcnt, numToSend == 1);
} else {
auto size = calculateIovecsTotalSize(iov, iovcnt);
if (size + batchSize > kMaxBatchSize && iovsUsed) {
// We already accumulated too much data, flush what we have.
if (!sendBatchFun(tail, iovecs.data(), iovsUsed, false)) {
break;
}
iovsUsed = 0;
batchSize = 0;
}
queue_.markNextAsSending();
if (size >= kMaxBatchSize || (iovsUsed == 0 && numToSend == 1)) {
// Req is either too big to batch or it's the last one, so just send it
// alone.
sendBatchFun(&req, iov, iovcnt, numToSend == 1);
} else {
memcpy(iovecs.data() + iovsUsed, iov, sizeof(struct iovec) * iovcnt);
iovsUsed += iovcnt;
batchSize += size;
tail = &req;
if (numToSend == 1) {
// This was the last request flush everything.
sendBatchFun(tail, iovecs.data(), iovsUsed, true);
}
}
}
--numToSend;
}
if (connectionState_ == ConnectionState::Up && pendingGoAwayReply_) {
// Note: we're not waiting for all requests to be sent, since that may take
// a while and if we didn't succeed in one loop, this means that we're
// already backlogged.
sendGoAwayReply();
}
pendingGoAwayReply_ = false;
scheduleNextWriterLoop();
}