thrift/lib/cpp2/transport/rocket/framing/Parser-inl.h (482 lines of code) (raw):

/* * Copyright (c) Meta Platforms, Inc. and affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include <algorithm> #include <chrono> #include <exception> #include <memory> #include <utility> #include <folly/ExceptionString.h> #include <folly/ExceptionWrapper.h> #include <folly/io/Cursor.h> #include <folly/io/IOBuf.h> #include <folly/io/async/AsyncSocketException.h> #include <folly/io/async/DelayedDestruction.h> #include <thrift/lib/cpp/transport/TTransportException.h> #include <thrift/lib/cpp2/transport/rocket/framing/FrameType.h> #include <thrift/lib/cpp2/transport/rocket/framing/Frames.h> #include <thrift/lib/cpp2/transport/rocket/framing/Serializer.h> #include <thrift/lib/cpp2/transport/rocket/framing/Util.h> namespace apache { namespace thrift { namespace rocket { template <class T> bool Parser<T>::customAlloc( folly::IOBuf& buffer, size_t startOffset, size_t frameSize) { auto iobuf = getCustomAllocBuf( std::max(buffer.length(), frameSize), startOffset, buffer.length()); if (UNLIKELY(!iobuf)) { return false; } memcpy(iobuf->writableData(), buffer.writableData(), buffer.length()); buffer = *std::move(iobuf); return true; } template <class T> bool Parser<T>::customAlloc( folly::IOBufQueue& bufQueue, size_t startOffset, size_t frameSize) { auto iobuf = getCustomAllocBuf( std::max(bufQueue.chainLength(), frameSize), startOffset, bufQueue.chainLength()); if (UNLIKELY(!iobuf)) { return false; } folly::io::Cursor cursor(bufQueue.front()); cursor.pull(iobuf->writableData(), bufQueue.chainLength()); folly::IOBufQueue bufQ{folly::IOBufQueue::cacheChainLength()}; bufQ.append(*std::move(iobuf)); bufQueue = std::move(bufQ); return true; } template <class T> std::unique_ptr<folly::IOBuf> Parser<T>::getCustomAllocBuf( size_t numBytes, size_t startOffset, size_t trimLength) { // we still support 4K alloc if the customAlloc function returns aligned mem static constexpr size_t kPageSize = 4096; const size_t padding = kPageSize - (startOffset % kPageSize); const size_t size = numBytes + padding; auto iobuf = owner_.customAlloc(size); if (UNLIKELY(!iobuf)) { return iobuf; } iobuf->trimStart(padding); iobuf->trimEnd(size - std::min(numBytes, trimLength) - padding); return iobuf; } template <class T> void Parser<T>::getReadBufferOld(void** bufout, size_t* lenout) { DCHECK(!readBuffer_.isChained()); if (LIKELY( allocType_ == apache::thrift::RpcOptions::MemAllocType::ALLOC_DEFAULT)) { if (periodicResizeBufferTimeout_ == 0) { const auto now = std::chrono::steady_clock::now(); if (now - lastResizeTime_ > resizeBufferTimeout_) { resizeBuffer(); lastResizeTime_ = now; } } readBuffer_.unshareOne(); if (readBuffer_.length() == 0) { DCHECK(readBuffer_.capacity() > 0); // If we read everything, reset pointers to 0 and reuse the buffer readBuffer_.clear(); } else if (readBuffer_.headroom() > 0) { // Move partially read data to the beginning readBuffer_.retreat(readBuffer_.headroom()); } } *bufout = readBuffer_.writableTail(); *lenout = readBuffer_.tailroom(); } template <class T> void Parser<T>::getReadBufferNew(void** bufout, size_t* lenout) { const auto ret = readBufQueue_.preallocate(bufferSize_, kMaxBufferSize); *bufout = ret.first; *lenout = ret.second; return; } template <class T> void Parser<T>::getReadBufferHybrid(void** bufout, size_t* lenout) { // if dynamic buffer is not null, read the remainder of currentFrameLength_ // into it, so it contains exactly one full frame if (dynamicBuffer_) { *bufout = dynamicBuffer_->writableTail(); *lenout = currentFrameLength_ - dynamicBuffer_->length(); } else { if (!readBuffer_.isSharedOne()) { // without external refs, we can move data (same as clear() if length==0) readBuffer_.retreat(readBuffer_.headroom()); } else if (reallocateIfShared_) { auto buf = folly::IOBuf(folly::IOBuf::CreateOp(), kStaticBufferSize); memcpy(buf.writableData(), readBuffer_.data(), readBuffer_.length()); buf.append(readBuffer_.length()); readBuffer_ = std::move(buf); } reallocateIfShared_ = false; *bufout = readBuffer_.writableTail(); *lenout = readBuffer_.tailroom(); } } template <class T> void Parser<T>::readDataAvailableOld(size_t nbytes) { readBuffer_.append(nbytes); while (!readBuffer_.empty()) { if (readBuffer_.length() < Serializer::kMinimumFrameHeaderLength) { return; } folly::io::Cursor cursor(&readBuffer_); const size_t totalFrameSize = Serializer::kBytesForFrameOrMetadataLength + readFrameOrMetadataSize(cursor); if (!currentFrameLength_) { if (!owner_.incMemoryUsage(totalFrameSize)) { return; } currentFrameLength_ = totalFrameSize; } readStreamId(cursor); uint8_t frameType; std::tie(frameType, std::ignore) = readFrameTypeAndFlagsUnsafe(cursor); if (UNLIKELY( static_cast<FrameType>(frameType) == FrameType::EXT && allocType_ == apache::thrift::RpcOptions::MemAllocType::ALLOC_DEFAULT)) { if (readBuffer_.length() < Serializer::kBytesForFrameOrMetadataLength + ExtFrame::frameHeaderSize()) { return; } ExtFrameType extType = readExtFrameType(cursor); if (UNLIKELY(extType == ExtFrameType::ALIGNED_PAGE)) { if (alignTo4k( readBuffer_, Serializer::kBytesForFrameOrMetadataLength + ExtFrame::frameHeaderSize(), totalFrameSize)) { allocType_ = apache::thrift::RpcOptions::MemAllocType::ALLOC_PAGE_ALIGN; } } else if (UNLIKELY(extType == ExtFrameType::CUSTOM_ALLOC)) { if (customAlloc( readBuffer_, Serializer::kBytesForFrameOrMetadataLength + ExtFrame::frameHeaderSize(), totalFrameSize)) { allocType_ = apache::thrift::RpcOptions::MemAllocType::ALLOC_CUSTOM; } } } if (readBuffer_.length() < totalFrameSize) { if (readBuffer_.length() + readBuffer_.tailroom() < totalFrameSize) { DCHECK(!readBuffer_.isChained()); readBuffer_.unshareOne(); bufferSize_ = std::max<size_t>(bufferSize_, totalFrameSize); readBuffer_.reserve( 0 /* minHeadroom */, bufferSize_ - readBuffer_.length() /* minTailroom */); } return; } // Otherwise, we have a full frame to handle. const size_t bytesToClone = totalFrameSize - Serializer::kBytesForFrameOrMetadataLength; cursor.reset(&readBuffer_); readFrameOrMetadataSize(cursor); std::unique_ptr<folly::IOBuf> frame; cursor.clone(frame, bytesToClone); owner_.decMemoryUsage(currentFrameLength_); currentFrameLength_ = 0; readBuffer_.trimStart(totalFrameSize); allocType_ = apache::thrift::RpcOptions::MemAllocType::ALLOC_DEFAULT; owner_.handleFrame(std::move(frame)); } if (periodicResizeBufferTimeout_ != 0 && !isScheduled() && bufferSize_ > kMaxBufferSize) { owner_.scheduleTimeout( this, std::chrono::seconds(periodicResizeBufferTimeout_)); } } template <class T> void Parser<T>::readDataAvailableNew(size_t nbytes) { readBufQueue_.postallocate(nbytes); while (!readBufQueue_.empty()) { if (readBufQueue_.chainLength() < Serializer::kMinimumFrameHeaderLength) { return; } folly::io::Cursor cursor(readBufQueue_.front()); const size_t totalFrameSize = Serializer::kBytesForFrameOrMetadataLength + readFrameOrMetadataSize(cursor); if (!currentFrameLength_) { if (!owner_.incMemoryUsage(totalFrameSize)) { return; } currentFrameLength_ = totalFrameSize; } readStreamId(cursor); uint8_t frameType; std::tie(frameType, std::ignore) = readFrameTypeAndFlagsUnsafe(cursor); if (UNLIKELY( static_cast<FrameType>(frameType) == FrameType::EXT && allocType_ == apache::thrift::RpcOptions::MemAllocType::ALLOC_DEFAULT)) { if (readBufQueue_.chainLength() < Serializer::kBytesForFrameOrMetadataLength + ExtFrame::frameHeaderSize()) { return; } ExtFrameType extType = readExtFrameType(cursor); if (UNLIKELY(extType == ExtFrameType::ALIGNED_PAGE)) { if (alignTo4kBufQueue( readBufQueue_, Serializer::kBytesForFrameOrMetadataLength + ExtFrame::frameHeaderSize(), totalFrameSize)) { allocType_ = apache::thrift::RpcOptions::MemAllocType::ALLOC_PAGE_ALIGN; } } else if (UNLIKELY(extType == ExtFrameType::CUSTOM_ALLOC)) { if (customAlloc( readBuffer_, Serializer::kBytesForFrameOrMetadataLength + ExtFrame::frameHeaderSize(), totalFrameSize)) { allocType_ = apache::thrift::RpcOptions::MemAllocType::ALLOC_CUSTOM; } } } if (readBufQueue_.chainLength() < currentFrameLength_) { bufferSize_ = currentFrameLength_ - readBufQueue_.chainLength(); return; } // Otherwise, we have a full frame to handle. readBufQueue_.trimStart(Serializer::kBytesForFrameOrMetadataLength); auto frame = readBufQueue_.split( currentFrameLength_ - Serializer::kBytesForFrameOrMetadataLength); owner_.handleFrame(std::move(frame)); owner_.decMemoryUsage(currentFrameLength_); currentFrameLength_ = 0; bufferSize_ = kMinBufferSize; allocType_ = apache::thrift::RpcOptions::MemAllocType::ALLOC_DEFAULT; } } template <class T> void Parser<T>::readDataAvailableHybrid(size_t nbytes) { if (dynamicBuffer_) { dynamicBuffer_->append(nbytes); if (dynamicBuffer_->length() < currentFrameLength_) { return; } DCHECK_EQ(dynamicBuffer_->length(), currentFrameLength_); owner_.handleFrame(std::move(dynamicBuffer_)); owner_.decMemoryUsage(currentFrameLength_); currentFrameLength_ = 0; currentFrameType_ = 0; return; } // set reallocate hint if latest read filled most of the buffer DCHECK_LE(nbytes, readBuffer_.tailroom()); reallocateIfShared_ = readBuffer_.tailroom() - nbytes < kReallocateThreshold; readBuffer_.append(nbytes); while (!readBuffer_.empty()) { const size_t bufLen = readBuffer_.length(); if (bufLen < Serializer::kMinimumFrameHeaderLength) { return; } folly::io::Cursor cursor(&readBuffer_); if (!currentFrameLength_) { auto frameLength = readFrameOrMetadataSize(cursor); if (!owner_.incMemoryUsage(frameLength)) { return; } currentFrameLength_ = frameLength; // skip over stream ID cursor.skipNoAdvance(sizeof(StreamId::underlying_type)); // read frameType and ignore flags std::tie(currentFrameType_, std::ignore) = readFrameTypeAndFlagsUnsafe(cursor); } else { cursor.skipNoAdvance(Serializer::kMinimumFrameHeaderLength); } const size_t totalSize = currentFrameLength_ + Serializer::kBytesForFrameOrMetadataLength; // check for alignment/custom alloc frame if (UNLIKELY( static_cast<FrameType>(currentFrameType_) == FrameType::EXT && allocType_ == apache::thrift::RpcOptions::MemAllocType::ALLOC_DEFAULT)) { if (bufLen < Serializer::kBytesForFrameOrMetadataLength + ExtFrame::frameHeaderSize()) { return; } ExtFrameType extType = readExtFrameType(cursor); if (UNLIKELY(extType == ExtFrameType::ALIGNED_PAGE)) { allocType_ = apache::thrift::RpcOptions::MemAllocType::ALLOC_PAGE_ALIGN; const size_t bytesToCopy = std::min( currentFrameLength_, readBuffer_.length() - Serializer::kBytesForFrameOrMetadataLength); dynamicBuffer_ = get4kAlignedBuf( currentFrameLength_, ExtFrame::frameHeaderSize(), bytesToCopy); if (LIKELY(dynamicBuffer_ != nullptr)) { allocType_ = apache::thrift::RpcOptions::MemAllocType::ALLOC_DEFAULT; readBuffer_.trimStart(Serializer::kBytesForFrameOrMetadataLength); memcpy( dynamicBuffer_->writableData(), readBuffer_.data(), bytesToCopy); readBuffer_.trimStart(bytesToCopy); // if we had the full frame, send it right away and continue loop if (bytesToCopy == currentFrameLength_) { owner_.handleFrame(std::move(dynamicBuffer_)); owner_.decMemoryUsage(currentFrameLength_); currentFrameLength_ = 0; currentFrameType_ = 0; continue; } // otherwise, return to read rest of frame into dynamic buffer return; } } else if (UNLIKELY(extType == ExtFrameType::CUSTOM_ALLOC)) { // TBD - remove duplicated code allocType_ = apache::thrift::RpcOptions::MemAllocType::ALLOC_CUSTOM; const size_t bytesToCopy = std::min( currentFrameLength_, readBuffer_.length() - Serializer::kBytesForFrameOrMetadataLength); dynamicBuffer_ = getCustomAllocBuf( currentFrameLength_, ExtFrame::frameHeaderSize(), bytesToCopy); if (LIKELY(dynamicBuffer_ != nullptr)) { allocType_ = apache::thrift::RpcOptions::MemAllocType::ALLOC_DEFAULT; readBuffer_.trimStart(Serializer::kBytesForFrameOrMetadataLength); memcpy( dynamicBuffer_->writableData(), readBuffer_.data(), bytesToCopy); readBuffer_.trimStart(bytesToCopy); // if we had the full frame, send it right away and continue loop if (bytesToCopy == currentFrameLength_) { owner_.handleFrame(std::move(dynamicBuffer_)); owner_.decMemoryUsage(currentFrameLength_); currentFrameLength_ = 0; currentFrameType_ = 0; continue; } // otherwise, return to read rest of frame into dynamic buffer return; } } } // we may have an incomplete frame if (totalSize > bufLen) { // switch to dynamic buffer only if there is no way to fit the whole frame // into the static buffer if (totalSize - bufLen > readBuffer_.tailroom() && LIKELY(totalSize > kStaticBufferSize || readBuffer_.isSharedOne())) { dynamicBuffer_ = folly::IOBuf::createCombined(currentFrameLength_); readBuffer_.trimStart(Serializer::kBytesForFrameOrMetadataLength); memcpy( dynamicBuffer_->writableData(), readBuffer_.data(), readBuffer_.length()); dynamicBuffer_->append(readBuffer_.length()); // "free" the data we just copied in readBuffer_ readBuffer_.prepend(Serializer::kBytesForFrameOrMetadataLength); readBuffer_.trimEnd(readBuffer_.length()); } return; } // otherwise, we have a full frame cursor.reset(&readBuffer_); cursor.skipNoAdvance(Serializer::kBytesForFrameOrMetadataLength); std::unique_ptr<folly::IOBuf> frame; cursor.clone(frame, currentFrameLength_); readBuffer_.trimStart(totalSize); allocType_ = apache::thrift::RpcOptions::MemAllocType::ALLOC_DEFAULT; owner_.handleFrame(std::move(frame)); owner_.decMemoryUsage(currentFrameLength_); currentFrameLength_ = 0; currentFrameType_ = 0; } } template <class T> void Parser<T>::getReadBuffer(void** bufout, size_t* lenout) { if (newBufferLogicEnabled_) { getReadBufferNew(bufout, lenout); } else if (hybridBufferLogicEnabled_) { getReadBufferHybrid(bufout, lenout); } else { getReadBufferOld(bufout, lenout); } } template <class T> void Parser<T>::readDataAvailable(size_t nbytes) noexcept { folly::DelayedDestruction::DestructorGuard dg(&this->owner_); try { if (newBufferLogicEnabled_) { readDataAvailableNew(nbytes); } else if (hybridBufferLogicEnabled_) { readDataAvailableHybrid(nbytes); } else { readDataAvailableOld(nbytes); } } catch (...) { auto exceptionStr = folly::exceptionStr(std::current_exception()).toStdString(); LOG(ERROR) << "Bad frame received, closing connection: " << exceptionStr; owner_.close(transport::TTransportException(exceptionStr)); } } template <class T> void Parser<T>::readEOF() noexcept { folly::DelayedDestruction::DestructorGuard dg(&this->owner_); owner_.close(transport::TTransportException( transport::TTransportException::TTransportExceptionType::END_OF_FILE, "Channel got EOF. Check for server hitting connection limit, " "server connection idle timeout, and server crashes.")); } template <class T> void Parser<T>::readErr(const folly::AsyncSocketException& ex) noexcept { folly::DelayedDestruction::DestructorGuard dg(&this->owner_); owner_.close(transport::TTransportException(ex)); } // TODO: This should be removed once the new buffer logic controlled by // THRIFT_FLAG(rocket_parser_dont_hold_buffer_enabled) is stable. template <class T> void Parser<T>::timeoutExpired() noexcept { if (LIKELY( allocType_ == apache::thrift::RpcOptions::MemAllocType::ALLOC_DEFAULT)) { resizeBuffer(); } } template <class T> void Parser<T>::readBufferAvailable( std::unique_ptr<folly::IOBuf> buf) noexcept { folly::DelayedDestruction::DestructorGuard dg(&this->owner_); try { readBufQueue_.append(std::move(buf)); while (!readBufQueue_.empty()) { if (readBufQueue_.chainLength() < Serializer::kBytesForFrameOrMetadataLength) { return; } folly::io::Cursor cursor(readBufQueue_.front()); if (!currentFrameLength_) { currentFrameLength_ = Serializer::kBytesForFrameOrMetadataLength + readFrameOrMetadataSize(cursor); if (!owner_.incMemoryUsage(currentFrameLength_)) { currentFrameLength_ = 0; return; } } if (readBufQueue_.chainLength() < currentFrameLength_) { return; } readBufQueue_.trimStart(Serializer::kBytesForFrameOrMetadataLength); auto frame = readBufQueue_.split( currentFrameLength_ - Serializer::kBytesForFrameOrMetadataLength); owner_.handleFrame(std::move(frame)); owner_.decMemoryUsage(currentFrameLength_); currentFrameLength_ = 0; } } catch (...) { auto exceptionStr = folly::exceptionStr(std::current_exception()).toStdString(); LOG(ERROR) << "Bad frame received, closing connection: " << exceptionStr; owner_.close(transport::TTransportException(exceptionStr)); } } // TODO: This should be removed once the new buffer logic controlled by // THRIFT_FLAG(rocket_parser_dont_hold_buffer_enabled) is stable. template <class T> void Parser<T>::resizeBuffer() { if (bufferSize_ <= kMaxBufferSize || readBuffer_.length() >= kMaxBufferSize) { return; } // resize readBuffer_ to kMaxBufferSize readBuffer_ = folly::IOBuf( folly::IOBuf::CopyBufferOp(), readBuffer_.data(), readBuffer_.length(), /* headroom */ 0, /* tailroom */ kMaxBufferSize - readBuffer_.length()); bufferSize_ = kMaxBufferSize; } template <class T> constexpr size_t Parser<T>::kMinBufferSize; template <class T> constexpr size_t Parser<T>::kMaxBufferSize; // TODO: This should be removed once the new buffer logic controlled by // THRIFT_FLAG(rocket_parser_dont_hold_buffer_enabled) is stable. template <class T> constexpr std::chrono::milliseconds Parser<T>::kDefaultBufferResizeInterval; template <class T> constexpr size_t Parser<T>::kStaticBufferSize; template <class T> constexpr size_t Parser<T>::kReallocateThreshold; } // namespace rocket } // namespace thrift } // namespace apache