in thrift/lib/cpp2/transport/rocket/framing/Parser-inl.h [307:442]
void Parser<T>::readDataAvailableHybrid(size_t nbytes) {
if (dynamicBuffer_) {
dynamicBuffer_->append(nbytes);
if (dynamicBuffer_->length() < currentFrameLength_) {
return;
}
DCHECK_EQ(dynamicBuffer_->length(), currentFrameLength_);
owner_.handleFrame(std::move(dynamicBuffer_));
owner_.decMemoryUsage(currentFrameLength_);
currentFrameLength_ = 0;
currentFrameType_ = 0;
return;
}
// set reallocate hint if latest read filled most of the buffer
DCHECK_LE(nbytes, readBuffer_.tailroom());
reallocateIfShared_ = readBuffer_.tailroom() - nbytes < kReallocateThreshold;
readBuffer_.append(nbytes);
while (!readBuffer_.empty()) {
const size_t bufLen = readBuffer_.length();
if (bufLen < Serializer::kMinimumFrameHeaderLength) {
return;
}
folly::io::Cursor cursor(&readBuffer_);
if (!currentFrameLength_) {
auto frameLength = readFrameOrMetadataSize(cursor);
if (!owner_.incMemoryUsage(frameLength)) {
return;
}
currentFrameLength_ = frameLength;
// skip over stream ID
cursor.skipNoAdvance(sizeof(StreamId::underlying_type));
// read frameType and ignore flags
std::tie(currentFrameType_, std::ignore) =
readFrameTypeAndFlagsUnsafe(cursor);
} else {
cursor.skipNoAdvance(Serializer::kMinimumFrameHeaderLength);
}
const size_t totalSize =
currentFrameLength_ + Serializer::kBytesForFrameOrMetadataLength;
// check for alignment/custom alloc frame
if (UNLIKELY(
static_cast<FrameType>(currentFrameType_) == FrameType::EXT &&
allocType_ ==
apache::thrift::RpcOptions::MemAllocType::ALLOC_DEFAULT)) {
if (bufLen < Serializer::kBytesForFrameOrMetadataLength +
ExtFrame::frameHeaderSize()) {
return;
}
ExtFrameType extType = readExtFrameType(cursor);
if (UNLIKELY(extType == ExtFrameType::ALIGNED_PAGE)) {
allocType_ = apache::thrift::RpcOptions::MemAllocType::ALLOC_PAGE_ALIGN;
const size_t bytesToCopy = std::min(
currentFrameLength_,
readBuffer_.length() - Serializer::kBytesForFrameOrMetadataLength);
dynamicBuffer_ = get4kAlignedBuf(
currentFrameLength_, ExtFrame::frameHeaderSize(), bytesToCopy);
if (LIKELY(dynamicBuffer_ != nullptr)) {
allocType_ = apache::thrift::RpcOptions::MemAllocType::ALLOC_DEFAULT;
readBuffer_.trimStart(Serializer::kBytesForFrameOrMetadataLength);
memcpy(
dynamicBuffer_->writableData(), readBuffer_.data(), bytesToCopy);
readBuffer_.trimStart(bytesToCopy);
// if we had the full frame, send it right away and continue loop
if (bytesToCopy == currentFrameLength_) {
owner_.handleFrame(std::move(dynamicBuffer_));
owner_.decMemoryUsage(currentFrameLength_);
currentFrameLength_ = 0;
currentFrameType_ = 0;
continue;
}
// otherwise, return to read rest of frame into dynamic buffer
return;
}
} else if (UNLIKELY(extType == ExtFrameType::CUSTOM_ALLOC)) {
// TBD - remove duplicated code
allocType_ = apache::thrift::RpcOptions::MemAllocType::ALLOC_CUSTOM;
const size_t bytesToCopy = std::min(
currentFrameLength_,
readBuffer_.length() - Serializer::kBytesForFrameOrMetadataLength);
dynamicBuffer_ = getCustomAllocBuf(
currentFrameLength_, ExtFrame::frameHeaderSize(), bytesToCopy);
if (LIKELY(dynamicBuffer_ != nullptr)) {
allocType_ = apache::thrift::RpcOptions::MemAllocType::ALLOC_DEFAULT;
readBuffer_.trimStart(Serializer::kBytesForFrameOrMetadataLength);
memcpy(
dynamicBuffer_->writableData(), readBuffer_.data(), bytesToCopy);
readBuffer_.trimStart(bytesToCopy);
// if we had the full frame, send it right away and continue loop
if (bytesToCopy == currentFrameLength_) {
owner_.handleFrame(std::move(dynamicBuffer_));
owner_.decMemoryUsage(currentFrameLength_);
currentFrameLength_ = 0;
currentFrameType_ = 0;
continue;
}
// otherwise, return to read rest of frame into dynamic buffer
return;
}
}
}
// we may have an incomplete frame
if (totalSize > bufLen) {
// switch to dynamic buffer only if there is no way to fit the whole frame
// into the static buffer
if (totalSize - bufLen > readBuffer_.tailroom() &&
LIKELY(totalSize > kStaticBufferSize || readBuffer_.isSharedOne())) {
dynamicBuffer_ = folly::IOBuf::createCombined(currentFrameLength_);
readBuffer_.trimStart(Serializer::kBytesForFrameOrMetadataLength);
memcpy(
dynamicBuffer_->writableData(),
readBuffer_.data(),
readBuffer_.length());
dynamicBuffer_->append(readBuffer_.length());
// "free" the data we just copied in readBuffer_
readBuffer_.prepend(Serializer::kBytesForFrameOrMetadataLength);
readBuffer_.trimEnd(readBuffer_.length());
}
return;
}
// otherwise, we have a full frame
cursor.reset(&readBuffer_);
cursor.skipNoAdvance(Serializer::kBytesForFrameOrMetadataLength);
std::unique_ptr<folly::IOBuf> frame;
cursor.clone(frame, currentFrameLength_);
readBuffer_.trimStart(totalSize);
allocType_ = apache::thrift::RpcOptions::MemAllocType::ALLOC_DEFAULT;
owner_.handleFrame(std::move(frame));
owner_.decMemoryUsage(currentFrameLength_);
currentFrameLength_ = 0;
currentFrameType_ = 0;
}
}