void Parser::readDataAvailableHybrid()

in thrift/lib/cpp2/transport/rocket/framing/Parser-inl.h [307:442]


void Parser<T>::readDataAvailableHybrid(size_t nbytes) {
  if (dynamicBuffer_) {
    dynamicBuffer_->append(nbytes);
    if (dynamicBuffer_->length() < currentFrameLength_) {
      return;
    }
    DCHECK_EQ(dynamicBuffer_->length(), currentFrameLength_);
    owner_.handleFrame(std::move(dynamicBuffer_));
    owner_.decMemoryUsage(currentFrameLength_);
    currentFrameLength_ = 0;
    currentFrameType_ = 0;
    return;
  }
  // set reallocate hint if latest read filled most of the buffer
  DCHECK_LE(nbytes, readBuffer_.tailroom());
  reallocateIfShared_ = readBuffer_.tailroom() - nbytes < kReallocateThreshold;
  readBuffer_.append(nbytes);
  while (!readBuffer_.empty()) {
    const size_t bufLen = readBuffer_.length();
    if (bufLen < Serializer::kMinimumFrameHeaderLength) {
      return;
    }

    folly::io::Cursor cursor(&readBuffer_);
    if (!currentFrameLength_) {
      auto frameLength = readFrameOrMetadataSize(cursor);
      if (!owner_.incMemoryUsage(frameLength)) {
        return;
      }
      currentFrameLength_ = frameLength;
      // skip over stream ID
      cursor.skipNoAdvance(sizeof(StreamId::underlying_type));
      // read frameType and ignore flags
      std::tie(currentFrameType_, std::ignore) =
          readFrameTypeAndFlagsUnsafe(cursor);
    } else {
      cursor.skipNoAdvance(Serializer::kMinimumFrameHeaderLength);
    }
    const size_t totalSize =
        currentFrameLength_ + Serializer::kBytesForFrameOrMetadataLength;

    // check for alignment/custom alloc frame
    if (UNLIKELY(
            static_cast<FrameType>(currentFrameType_) == FrameType::EXT &&
            allocType_ ==
                apache::thrift::RpcOptions::MemAllocType::ALLOC_DEFAULT)) {
      if (bufLen < Serializer::kBytesForFrameOrMetadataLength +
              ExtFrame::frameHeaderSize()) {
        return;
      }
      ExtFrameType extType = readExtFrameType(cursor);
      if (UNLIKELY(extType == ExtFrameType::ALIGNED_PAGE)) {
        allocType_ = apache::thrift::RpcOptions::MemAllocType::ALLOC_PAGE_ALIGN;
        const size_t bytesToCopy = std::min(
            currentFrameLength_,
            readBuffer_.length() - Serializer::kBytesForFrameOrMetadataLength);
        dynamicBuffer_ = get4kAlignedBuf(
            currentFrameLength_, ExtFrame::frameHeaderSize(), bytesToCopy);
        if (LIKELY(dynamicBuffer_ != nullptr)) {
          allocType_ = apache::thrift::RpcOptions::MemAllocType::ALLOC_DEFAULT;
          readBuffer_.trimStart(Serializer::kBytesForFrameOrMetadataLength);
          memcpy(
              dynamicBuffer_->writableData(), readBuffer_.data(), bytesToCopy);
          readBuffer_.trimStart(bytesToCopy);
          // if we had the full frame, send it right away and continue loop
          if (bytesToCopy == currentFrameLength_) {
            owner_.handleFrame(std::move(dynamicBuffer_));
            owner_.decMemoryUsage(currentFrameLength_);
            currentFrameLength_ = 0;
            currentFrameType_ = 0;
            continue;
          }
          // otherwise, return to read rest of frame into dynamic buffer
          return;
        }
      } else if (UNLIKELY(extType == ExtFrameType::CUSTOM_ALLOC)) {
        // TBD - remove duplicated code
        allocType_ = apache::thrift::RpcOptions::MemAllocType::ALLOC_CUSTOM;
        const size_t bytesToCopy = std::min(
            currentFrameLength_,
            readBuffer_.length() - Serializer::kBytesForFrameOrMetadataLength);
        dynamicBuffer_ = getCustomAllocBuf(
            currentFrameLength_, ExtFrame::frameHeaderSize(), bytesToCopy);
        if (LIKELY(dynamicBuffer_ != nullptr)) {
          allocType_ = apache::thrift::RpcOptions::MemAllocType::ALLOC_DEFAULT;
          readBuffer_.trimStart(Serializer::kBytesForFrameOrMetadataLength);
          memcpy(
              dynamicBuffer_->writableData(), readBuffer_.data(), bytesToCopy);
          readBuffer_.trimStart(bytesToCopy);
          // if we had the full frame, send it right away and continue loop
          if (bytesToCopy == currentFrameLength_) {
            owner_.handleFrame(std::move(dynamicBuffer_));
            owner_.decMemoryUsage(currentFrameLength_);
            currentFrameLength_ = 0;
            currentFrameType_ = 0;
            continue;
          }
          // otherwise, return to read rest of frame into dynamic buffer
          return;
        }
      }
    }

    // we may have an incomplete frame
    if (totalSize > bufLen) {
      // switch to dynamic buffer only if there is no way to fit the whole frame
      // into the static buffer
      if (totalSize - bufLen > readBuffer_.tailroom() &&
          LIKELY(totalSize > kStaticBufferSize || readBuffer_.isSharedOne())) {
        dynamicBuffer_ = folly::IOBuf::createCombined(currentFrameLength_);
        readBuffer_.trimStart(Serializer::kBytesForFrameOrMetadataLength);
        memcpy(
            dynamicBuffer_->writableData(),
            readBuffer_.data(),
            readBuffer_.length());
        dynamicBuffer_->append(readBuffer_.length());
        // "free" the data we just copied in readBuffer_
        readBuffer_.prepend(Serializer::kBytesForFrameOrMetadataLength);
        readBuffer_.trimEnd(readBuffer_.length());
      }
      return;
    }

    // otherwise, we have a full frame
    cursor.reset(&readBuffer_);
    cursor.skipNoAdvance(Serializer::kBytesForFrameOrMetadataLength);
    std::unique_ptr<folly::IOBuf> frame;
    cursor.clone(frame, currentFrameLength_);
    readBuffer_.trimStart(totalSize);
    allocType_ = apache::thrift::RpcOptions::MemAllocType::ALLOC_DEFAULT;
    owner_.handleFrame(std::move(frame));
    owner_.decMemoryUsage(currentFrameLength_);
    currentFrameLength_ = 0;
    currentFrameType_ = 0;
  }
}