in mcrouter/lib/network/McParser.cpp [126:224]
bool McParser::readCaretData() {
while (readBuffer_.length() > 0) {
// Parse header
ParseStatus parseStatus;
parseStatus =
caretParseHeader(readBuffer_.data(), readBuffer_.length(), msgInfo_);
if (parseStatus == ParseStatus::NotEnoughData) {
return true;
}
if (parseStatus != ParseStatus::Ok) {
callback_.parseError(
carbon::Result::REMOTE_ERROR,
folly::sformat(
"Error parsing {} header", mc_protocol_to_string(protocol_)));
return false;
}
const auto messageSize = msgInfo_.headerSize + msgInfo_.bodySize;
// Case 0: There was an overflow. Return an error and let mcrouter close the
// connection.
if (messageSize == 0) {
LOG(ERROR)
<< "Got a 0 size message, likely due to overflow. Returning a parse error.";
return false;
}
// Parse message body
// Case 1: Entire message (and possibly part of next) is in the buffer
if (readBuffer_.length() >= messageSize) {
if (UNLIKELY(debugFifo_ && debugFifo_->isConnected())) {
debugFifo_->startMessage(MessageDirection::Received, msgInfo_.typeId);
debugFifo_->writeData(readBuffer_.writableData(), messageSize);
}
bool cbStatus;
cbStatus = callback_.caretMessageReady(msgInfo_, readBuffer_);
if (!cbStatus) {
readBuffer_.clear();
return false;
}
readBuffer_.trimStart(messageSize);
continue;
}
// Case 2: We don't have full header, so return to wait for more data
if (readBuffer_.length() < msgInfo_.headerSize) {
return true;
}
// Case 3: We have the full header, but not the full body. If needed,
// reallocate into a buffer large enough for full header and body. Then
// return to wait for remaining data.
if (readBuffer_.length() + readBuffer_.tailroom() < messageSize) {
assert(!readBuffer_.isChained());
if (messageSize > kMaxBodySize) {
LOG(ERROR) << "Body size was " << messageSize
<< ", but max size allowed is " << kMaxBodySize;
return false;
}
readBuffer_.unshareOne();
bufferSize_ = std::max<size_t>(bufferSize_, messageSize);
readBuffReserve(bufferSize_ - readBuffer_.length());
}
#ifdef FOLLY_JEMALLOC_NODUMP_ALLOCATOR_SUPPORTED
// This is an ugly patch to handle the case that somehow the readBuf got
// reallocated. This could happen for
// instance by calling IOBuf::reserve or IOBuf::unshare
if (useJemallocNodumpAllocator_ &&
readBuffer_.getFreeFn() != noDumpDeallocate) {
readBuffer_ =
copyToNodumpBuffer(readBuffer_, readBuffer_.length() + bufferSize_);
}
#endif
return true;
}
// We parsed everything, read buffer is empty.
// Try to shrink it to reduce memory footprint
// TODO: should compare the readbuffer capacity not bufferSize
if (bufferSize_ > maxBufferSize_) {
auto curCycles = cycles::getCpuCycles();
if (curCycles > lastShrinkCycles_ + kAdjustBufferSizeCpuCycles) {
lastShrinkCycles_ = curCycles;
bufferSize_ = maxBufferSize_;
#ifdef FOLLY_JEMALLOC_NODUMP_ALLOCATOR_SUPPORTED
if (useJemallocNodumpAllocator_) {
readBuffer_ = copyToNodumpBuffer(readBuffer_, bufferSize_);
return true;
}
#endif
readBuffer_ = folly::IOBuf(folly::IOBuf::CREATE, bufferSize_);
}
}
return true;
}