thrift/lib/cpp/transport/THeader.h (273 lines of code) (raw):

/* * Copyright (c) Facebook, Inc. and its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #ifndef THRIFT_TRANSPORT_THEADER_H_ #define THRIFT_TRANSPORT_THEADER_H_ 1 #include <functional> #include <map> #include <optional> #include <string_view> #include <vector> #include <folly/Optional.h> #include <folly/String.h> #include <folly/Utility.h> #include <folly/portability/Unistd.h> #include <thrift/lib/cpp/concurrency/Thread.h> #include <thrift/lib/cpp/protocol/TProtocolTypes.h> #include <thrift/lib/thrift/gen-cpp2/RpcMetadata_types.h> #include <bitset> #include <chrono> // These are local to this build and never appear on the wire. enum CLIENT_TYPE { THRIFT_HEADER_CLIENT_TYPE = 0, THRIFT_FRAMED_DEPRECATED = 1, THRIFT_UNFRAMED_DEPRECATED = 2, THRIFT_HTTP_SERVER_TYPE = 3, THRIFT_HTTP_CLIENT_TYPE = 4, THRIFT_FRAMED_COMPACT = 5, THRIFT_ROCKET_CLIENT_TYPE = 6, THRIFT_HTTP_GET_CLIENT_TYPE = 7, THRIFT_UNFRAMED_COMPACT_DEPRECATED = 8, THRIFT_HTTP2_CLIENT_TYPE = 9, // This MUST always be last and have the largest value! THRIFT_UNKNOWN_CLIENT_TYPE = 10, }; #define CLIENT_TYPES_LEN THRIFT_UNKNOWN_CLIENT_TYPE // These appear on the wire. enum HEADER_FLAGS { HEADER_FLAG_SUPPORT_OUT_OF_ORDER = 0x01, // Set for reverse messages (server->client requests, client->server replies) HEADER_FLAG_DUPLEX_REVERSE = 0x08, }; namespace folly { class IOBuf; class IOBufQueue; } // namespace folly namespace apache { namespace thrift { namespace util { class THttpClientParser; } } // namespace thrift } // namespace apache namespace apache { namespace thrift { namespace transport { namespace detail { /** * This is a helper class to facilitate transport upgrade from header to rocket * for non-TLS services. The socket stored in header channel is a shared_ptr * while the socket in rocket is a unique_ptr. The goal is to transfer the * socket from header to rocket, by managing the lifetime using this custom * deleter which makes it possible for the unique_ptr stolen by stealPtr() * outlives the shared_ptr. */ template <typename T, typename Deleter> class ReleaseDeleter { public: explicit ReleaseDeleter(std::unique_ptr<T, Deleter> uPtr) : ptr_(uPtr.release()), deleter_(uPtr.get_deleter()) {} void operator()(T* obj) { (void)obj; if (ptr_) { DCHECK(obj == ptr_); deleter_(ptr_); } } /** * Steal the unique_ptr stored in this deleter. */ std::unique_ptr<T, Deleter> stealPtr() { DCHECK(ptr_); auto uPtr = std::unique_ptr<T, Deleter>(ptr_, deleter_); ptr_ = nullptr; return uPtr; } private: T* ptr_; Deleter deleter_; }; template <typename T, typename Deleter> std::shared_ptr<T> convertToShared(std::unique_ptr<T, Deleter> uPtr) { auto ptr = uPtr.get(); auto deleter = ReleaseDeleter<T, Deleter>(std::move(uPtr)); return std::shared_ptr<T>(ptr, deleter); } } // namespace detail using apache::thrift::protocol::T_BINARY_PROTOCOL; using apache::thrift::protocol::T_COMPACT_PROTOCOL; /** * Class that will take an IOBuf and wrap it in some thrift headers. * see thrift/doc/HeaderFormat.txt for details. * * Supports transforms: zlib snappy zstd * Supports headers: http-style key/value per request and per connection * other: Protocol Id and seq ID in header. * * Backwards compatible with TFramed format, and unframed format, assuming * your server transport is compatible (some server types require 4-byte size * at the start). */ class THeader final { public: enum { ALLOW_BIG_FRAMES = 1 << 0, }; explicit THeader(int options = 0); void setClientType(CLIENT_TYPE ct) { this->clientType_ = ct; } // Force using specified client type when using legacy client types // i.e. sniffing out client type is disabled. void forceClientType(bool enable) { forceClientType_ = enable; } CLIENT_TYPE getClientType() const { return clientType_; } uint16_t getProtocolId() const { return protoId_; } void setProtocolId(uint16_t protoId) { this->protoId_ = protoId; } int8_t getProtocolVersion() const; void setProtocolVersion(uint8_t ver) { this->protoVersion_ = ver; } void resetProtocol(); uint16_t getFlags() const { return flags_; } void setFlags(uint16_t flags) { flags_ = flags; } // Info headers typedef std::map<std::string, std::string> StringToStringMap; /** * We know we got a packet in header format here, try to parse the header * * @param IObuf of the header + data. Untransforms the data as appropriate. * @return Just the data section in an IOBuf */ std::unique_ptr<folly::IOBuf> readHeaderFormat( std::unique_ptr<folly::IOBuf>, StringToStringMap& persistentReadHeaders); /** * Untransform the data based on the received header flags * On conclusion of function, setReadBuffer is called with the * untransformed data. * * @param IOBuf input data section * @return IOBuf output data section */ static std::unique_ptr<folly::IOBuf> untransform( std::unique_ptr<folly::IOBuf>, std::vector<uint16_t>& readTrans); /** * Transform the data based on our write transform flags * At conclusion of function the write buffer is set to the * transformed data. * * @param IOBuf to transform. Returns transformed IOBuf (or chain) * @return transformed data IOBuf */ static std::unique_ptr<folly::IOBuf> transform( std::unique_ptr<folly::IOBuf>, std::vector<uint16_t>& writeTrans, size_t minCompressBytes = 0); /** * Copy metadata, but not headers. */ void copyMetadataFrom(const THeader& src); static uint16_t getNumTransforms(const std::vector<uint16_t>& transforms) { return folly::to_narrow(transforms.size()); } void setTransform(uint16_t transId); void setReadTransform(uint16_t transId); void setTransforms(const std::vector<uint16_t>& trans) { writeTrans_ = trans; } const std::vector<uint16_t>& getTransforms() const { return readTrans_; } std::vector<uint16_t>& getWriteTransforms() { return writeTrans_; } void setClientMetadata(const ClientMetadata& clientMetadata); std::optional<ClientMetadata> extractClientMetadata(); // these work with write headers void setHeader(const std::string& key, const std::string& value); void setHeader(const std::string& key, std::string&& value); void setHeader( const char* key, size_t keyLength, const char* value, size_t valueLength); void setHeaders(StringToStringMap&&); void clearHeaders(); bool isWriteHeadersEmpty() const; StringToStringMap& mutableWriteHeaders(); StringToStringMap releaseWriteHeaders(); StringToStringMap extractAllWriteHeaders(); const StringToStringMap& getWriteHeaders() const; // these work with read headers void setReadHeaders(StringToStringMap&&); void setReadHeader(const std::string& key, std::string&& value); void eraseReadHeader(const std::string& key); const StringToStringMap& getHeaders() const; StringToStringMap releaseHeaders(); void setExtraWriteHeaders(StringToStringMap* extraWriteHeaders) { extraWriteHeaders_ = extraWriteHeaders; } StringToStringMap* getExtraWriteHeaders() const { return extraWriteHeaders_; } std::string getPeerIdentity() const; void setIdentity(const std::string& identity); // accessors for seqId uint32_t getSequenceNumber() const { return seqId_; } void setSequenceNumber(uint32_t sid) { this->seqId_ = sid; } enum TRANSFORMS { NONE = 0x00, ZLIB_TRANSFORM = 0x01, // HMAC_TRANSFORM = 0x02, Deprecated and no longer supported // SNAPPY_TRANSFORM = 0x03, Deprecated and no longer supported // QLZ_TRANSFORM = 0x04, Deprecated and no longer supported ZSTD_TRANSFORM = 0x05, // DO NOT USE. Sentinel value for enum count. Always keep as last value. TRANSFORM_LAST_FIELD = 0x06, }; /* IOBuf interface */ /** * Adds the header based on the type of transport: * unframed - does nothing. * framed - prepends frame size * header - prepends header, optionally appends mac * http - only supported for sync case, prepends http header. * * @return IOBuf chain with header _and_ data. Data is not copied */ std::unique_ptr<folly::IOBuf> addHeader( std::unique_ptr<folly::IOBuf>, StringToStringMap& persistentWriteHeaders, bool transform = true); /** * Given an IOBuf Chain, remove the header. Supports unframed (sync * only), framed, header, and http (sync case only). This doesn't * check if the client type implied by the header is valid. * isSupportedClient() or checkSupportedClient() should be called * after. * * @param IOBufQueue - queue to try to read message from. * * @param needed - if the return is nullptr (i.e. we didn't read a full * message), needed is set to the number of bytes needed * before you should call removeHeader again. * * @return IOBuf - the message chain. May be shared, may be chained. * If nullptr, we didn't get enough data for a whole message, * call removeHeader again after reading needed more bytes. */ std::unique_ptr<folly::IOBuf> removeHeader( folly::IOBufQueue*, size_t& needed, StringToStringMap& persistentReadHeaders); void setDesiredCompressionConfig(CompressionConfig compressionConfig) { compressionConfig_ = compressionConfig; } folly::Optional<CompressionConfig> getDesiredCompressionConfig() const { return compressionConfig_; } void setCrc32c(folly::Optional<uint32_t> crc32c) { crc32c_ = crc32c; } folly::Optional<uint32_t> getCrc32c() const { return crc32c_; } void setServerLoad(folly::Optional<int64_t> load) { serverLoad_ = load; } folly::Optional<int64_t> getServerLoad() const { return serverLoad_; } apache::thrift::concurrency::PRIORITY getCallPriority() const; std::chrono::milliseconds getTimeoutFromHeader( const std::string& header) const; std::chrono::milliseconds getClientTimeout() const; std::chrono::milliseconds getClientQueueTimeout() const; // Overall queue timeout (either set by client or server override) // This is set on the server side in responses. folly::Optional<std::chrono::milliseconds> getServerQueueTimeout() const; // This is populated by the server and reflects the time the request spent // in the queue prior to processing. folly::Optional<std::chrono::milliseconds> getProcessDelay() const; const folly::Optional<std::string>& clientId() const; const folly::Optional<std::string>& serviceTraceMeta() const; void setHttpClientParser( std::shared_ptr<apache::thrift::util::THttpClientParser>); void setClientTimeout(std::chrono::milliseconds timeout); void setClientQueueTimeout(std::chrono::milliseconds timeout); void setServerQueueTimeout(std::chrono::milliseconds timeout); void setProcessDelay(std::chrono::milliseconds timeQueued); void setCallPriority(apache::thrift::concurrency::PRIORITY priority); void setClientId(const std::string& clientId); void setServiceTraceMeta(const std::string& serviceTraceMeta); // Utility method for converting TRANSFORMS enum to string static const folly::StringPiece getStringTransform( const TRANSFORMS transform); static CLIENT_TYPE tryGetClientType(const folly::IOBuf& data); void setRoutingData(std::shared_ptr<void> data) { routingData_ = std::move(data); } std::shared_ptr<void> releaseRoutingData() { return std::move(routingData_); } // 0 and 16th bits must be 0 to differentiate from framed & unframed static const uint32_t HEADER_MAGIC = 0x0FFF0000; static const uint32_t HEADER_MASK = 0xFFFF0000; static const uint32_t FLAGS_MASK = 0x0000FFFF; static const uint32_t HTTP_SERVER_MAGIC = 0x504F5354; // POST static const uint32_t HTTP_CLIENT_MAGIC = 0x48545450; // HTTP static const uint32_t HTTP_GET_CLIENT_MAGIC = 0x47455420; // GET static const uint32_t HTTP_HEAD_CLIENT_MAGIC = 0x48454144; // HEAD static const uint32_t BIG_FRAME_MAGIC = 0x42494746; // BIGF static const uint32_t MAX_FRAME_SIZE = 0x3FFFFFFF; static const std::string PRIORITY_HEADER; static const std::string& CLIENT_TIMEOUT_HEADER; static const std::string QUEUE_TIMEOUT_HEADER; static const std::string QUERY_LOAD_HEADER; static const std::string kClientId; static const std::string kServiceTraceMeta; static constexpr std::string_view CLIENT_METADATA_HEADER = "client_metadata"; private: static bool isFramed(CLIENT_TYPE clientType); // Use first 64 bits to determine client protocol static folly::Optional<CLIENT_TYPE> analyzeFirst32bit(uint32_t w); static CLIENT_TYPE analyzeSecond32bit(uint32_t w); // Calls appropriate method based on client type // returns nullptr if Header of Unknown type std::unique_ptr<folly::IOBuf> removeNonHeader( folly::IOBufQueue* queue, size_t& needed, CLIENT_TYPE clientType, uint32_t sz); template < template <class BaseProt> class ProtocolClass, protocol::PROTOCOL_TYPES ProtocolID> std::unique_ptr<folly::IOBuf> removeUnframed( folly::IOBufQueue* queue, size_t& needed); std::unique_ptr<folly::IOBuf> removeHttpServer(folly::IOBufQueue* queue); std::unique_ptr<folly::IOBuf> removeHttpClient( folly::IOBufQueue* queue, size_t& needed); std::unique_ptr<folly::IOBuf> removeFramed( uint32_t sz, folly::IOBufQueue* queue); /** * Returns the maximum number of bytes that write k/v headers can take */ size_t getMaxWriteHeadersSize( const StringToStringMap& persistentWriteHeaders) const; /** * Returns whether the 1st byte of the protocol payload should be hadled * as compact framed. */ static bool compactFramed(uint32_t magic); std::optional<std::string> extractHeader(std::string_view key); StringToStringMap& ensureReadHeaders(); StringToStringMap& ensureWriteHeaders(); // Http client parser std::shared_ptr<apache::thrift::util::THttpClientParser> httpClientParser_; int16_t protoId_; int8_t protoVersion_; CLIENT_TYPE clientType_; bool forceClientType_; uint32_t seqId_; uint16_t flags_; std::string identity_; std::vector<uint16_t> readTrans_; std::vector<uint16_t> writeTrans_; // Map to use for headers std::optional<StringToStringMap> readHeaders_; std::optional<StringToStringMap> writeHeaders_; // Won't be cleared when flushing StringToStringMap* extraWriteHeaders_{nullptr}; // If these values are set, they are used instead of looking inside // the header map. folly::Optional<std::chrono::milliseconds> clientTimeout_; folly::Optional<std::chrono::milliseconds> queueTimeout_; folly::Optional<std::chrono::milliseconds> processDelay_; folly::Optional<std::chrono::milliseconds> serverQueueTimeout_; folly::Optional<apache::thrift::concurrency::PRIORITY> priority_; folly::Optional<std::string> clientId_; folly::Optional<std::string> serviceTraceMeta_; static const std::string IDENTITY_HEADER; static const std::string ID_VERSION_HEADER; static const std::string ID_VERSION; bool allowBigFrames_; folly::Optional<CompressionConfig> compressionConfig_; std::shared_ptr<void> routingData_; struct infoIdType { enum idType { // start at 1 to avoid confusing header padding for an infoId KEYVALUE = 1, // for persistent header PKEYVALUE = 2, END // signal the end of infoIds we can handle }; }; // CRC32C of message payload for checksum. folly::Optional<uint32_t> crc32c_; folly::Optional<int64_t> serverLoad_; }; } // namespace transport } // namespace thrift } // namespace apache #endif // #ifndef THRIFT_TRANSPORT_THEADER_H_