bool McParser::readCaretData()

in mcrouter/lib/network/McParser.cpp [126:224]


bool McParser::readCaretData() {
  while (readBuffer_.length() > 0) {
    // Parse header
    ParseStatus parseStatus;
    parseStatus =
        caretParseHeader(readBuffer_.data(), readBuffer_.length(), msgInfo_);

    if (parseStatus == ParseStatus::NotEnoughData) {
      return true;
    }

    if (parseStatus != ParseStatus::Ok) {
      callback_.parseError(
          carbon::Result::REMOTE_ERROR,
          folly::sformat(
              "Error parsing {} header", mc_protocol_to_string(protocol_)));
      return false;
    }

    const auto messageSize = msgInfo_.headerSize + msgInfo_.bodySize;

    // Case 0: There was an overflow. Return an error and let mcrouter close the
    // connection.
    if (messageSize == 0) {
      LOG(ERROR)
          << "Got a 0 size message, likely due to overflow. Returning a parse error.";
      return false;
    }

    // Parse message body
    // Case 1: Entire message (and possibly part of next) is in the buffer
    if (readBuffer_.length() >= messageSize) {
      if (UNLIKELY(debugFifo_ && debugFifo_->isConnected())) {
        debugFifo_->startMessage(MessageDirection::Received, msgInfo_.typeId);
        debugFifo_->writeData(readBuffer_.writableData(), messageSize);
      }

      bool cbStatus;
      cbStatus = callback_.caretMessageReady(msgInfo_, readBuffer_);

      if (!cbStatus) {
        readBuffer_.clear();
        return false;
      }
      readBuffer_.trimStart(messageSize);
      continue;
    }

    // Case 2: We don't have full header, so return to wait for more data
    if (readBuffer_.length() < msgInfo_.headerSize) {
      return true;
    }

    // Case 3: We have the full header, but not the full body. If needed,
    // reallocate into a buffer large enough for full header and body. Then
    // return to wait for remaining data.
    if (readBuffer_.length() + readBuffer_.tailroom() < messageSize) {
      assert(!readBuffer_.isChained());
      if (messageSize > kMaxBodySize) {
        LOG(ERROR) << "Body size was " << messageSize
                   << ", but max size allowed is " << kMaxBodySize;
        return false;
      }
      readBuffer_.unshareOne();
      bufferSize_ = std::max<size_t>(bufferSize_, messageSize);
      readBuffReserve(bufferSize_ - readBuffer_.length());
    }
#ifdef FOLLY_JEMALLOC_NODUMP_ALLOCATOR_SUPPORTED
    // This is an ugly patch to handle the case that somehow the readBuf got
    // reallocated. This could happen for
    // instance by calling IOBuf::reserve or IOBuf::unshare
    if (useJemallocNodumpAllocator_ &&
        readBuffer_.getFreeFn() != noDumpDeallocate) {
      readBuffer_ =
          copyToNodumpBuffer(readBuffer_, readBuffer_.length() + bufferSize_);
    }
#endif
    return true;
  }

  // We parsed everything, read buffer is empty.
  // Try to shrink it to reduce memory footprint
  // TODO: should compare the readbuffer capacity not bufferSize
  if (bufferSize_ > maxBufferSize_) {
    auto curCycles = cycles::getCpuCycles();
    if (curCycles > lastShrinkCycles_ + kAdjustBufferSizeCpuCycles) {
      lastShrinkCycles_ = curCycles;
      bufferSize_ = maxBufferSize_;
#ifdef FOLLY_JEMALLOC_NODUMP_ALLOCATOR_SUPPORTED
      if (useJemallocNodumpAllocator_) {
        readBuffer_ = copyToNodumpBuffer(readBuffer_, bufferSize_);
        return true;
      }
#endif
      readBuffer_ = folly::IOBuf(folly::IOBuf::CREATE, bufferSize_);
    }
  }
  return true;
}