proxygen/lib/http/codec/HQMultiCodec.h (206 lines of code) (raw):

/* * Copyright (c) Meta Platforms, Inc. and affiliates. * All rights reserved. * * This source code is licensed under the BSD-style license found in the * LICENSE file in the root directory of this source tree. */ #pragma once #include <proxygen/lib/http/codec/HQControlCodec.h> #include <proxygen/lib/http/codec/HQStreamCodec.h> #include <proxygen/lib/http/codec/compress/QPACKCodec.h> namespace proxygen { namespace hq { class HQMultiCodec : public HQControlCodec { public: explicit HQMultiCodec(TransportDirection direction) : HQControlCodec(HTTPCodec::MaxStreamID, direction, StreamDirection::INGRESS, /* to match settings */ ingressSettings_, UnidirectionalStreamType::CONTROL) { VLOG(4) << "creating " << getTransportDirectionString(direction) << " HQMultiCodec for stream " << streamId_; // Has to be explicitly enabled doubleGoaway_ = false; minUnseenStreamID_ = 0; minUnseenPushID_ = 0; } ~HQMultiCodec() override = default; void setControlStreamID(StreamID controlID) { streamId_ = controlID; } void setQPACKEncoderMaxDataFn(std::function<uint64_t()> qpackEncoderMaxData) { qpackEncoderMaxDataFn_ = std::move(qpackEncoderMaxData); } bool setCurrentStream(StreamID currentStream) { if (codecs_.find(currentStream) == codecs_.end()) { return false; } currentStream_ = currentStream; return true; } bool isStreamIngressEgressAllowed(StreamID streamId) const { CHECK(transportDirection_ == TransportDirection::DOWNSTREAM); return streamId < egressGoawayAck_; } HTTPCodec& addCodec(StreamID streamId) { if (transportDirection_ == TransportDirection::DOWNSTREAM && (streamId & 0x3) == 0 && streamId >= minUnseenStreamID_) { CHECK_LT(streamId, egressGoawayAck_) << "Don't addCodec for refused stream"; // only bump for client initiated bidi streams, for now minUnseenStreamID_ = streamId + 4; } auto res = codecs_.emplace(streamId, std::make_unique<HQStreamCodec>(streamId, transportDirection_, qpackCodec_, qpackEncoderWriteBuf_, qpackDecoderWriteBuf_, qpackEncoderMaxDataFn_, settings_)); auto& codec = res.first->second; codec->setCallback(callback_); return *codec; } void removeCodec(StreamID streamId) { codecs_.erase(streamId); } void setResumeHook(StreamID streamId, folly::Function<void()> hook = nullptr) { getCodec(streamId).setResumeHook(std::move(hook)); } QPACKCodec& getQPACKCodec() { return qpackCodec_; } folly::IOBufQueue& getQPACKEncoderWriteBuf() { return qpackEncoderWriteBuf_; } folly::IOBufQueue& getQPACKDecoderWriteBuf() { return qpackDecoderWriteBuf_; } void encodeCancelStream(quic::StreamId id) { auto cancel = qpackCodec_.encodeCancelStream(id); qpackDecoderWriteBuf_.append(std::move(cancel)); } bool encodeInsertCountIncrement() { auto ici = qpackCodec_.encodeInsertCountInc(); if (ici) { qpackDecoderWriteBuf_.append(std::move(ici)); return true; } return false; } void setCallback(proxygen::HTTPCodec::Callback* callback) override { HQControlCodec::setCallback(callback); for (const auto& codec : codecs_) { codec.second->setCallback(callback); } } const std::string& getUserAgent() const override { // TODO static const std::string empty; return empty; } size_t onIngress(const folly::IOBuf& buf) override { auto res = getCurrentCodec().onIngress(buf); currentStream_ = HTTPCodec::MaxStreamID; return res; } void onIngressEOF() override { getCurrentCodec().onIngressEOF(); currentStream_ = HTTPCodec::MaxStreamID; } bool isReusable() const override { return !sentGoaway_; } bool isParserPaused() const override { auto res = getCurrentCodec().isParserPaused(); currentStream_ = HTTPCodec::MaxStreamID; return res; } bool supportsParallelRequests() const override { return true; } size_t generateConnectionPreface(folly::IOBufQueue& /*writeBuf*/) override { return 0; } size_t generateSettingsAck(folly::IOBufQueue& /*writeBuf*/) override { return 0; } // It is possible to make HQStreamCodec egress stateless and avoid the // hashtable lookup in the generate* functions. void generateHeader( folly::IOBufQueue& writeBuf, StreamID stream, const HTTPMessage& msg, bool eom = false, HTTPHeaderSize* size = nullptr, const folly::Optional<HTTPHeaders>& extraHeaders = folly::none) override { getCodec(stream).generateHeader( writeBuf, stream, msg, eom, size, extraHeaders); } void generatePushPromise(folly::IOBufQueue& writeBuf, StreamID stream, const HTTPMessage& msg, StreamID pushID, bool eom = false, HTTPHeaderSize* size = nullptr) override { getCodec(stream).generatePushPromise( writeBuf, stream, msg, pushID, eom, size); } size_t generateBody(folly::IOBufQueue& writeBuf, StreamID stream, std::unique_ptr<folly::IOBuf> chain, folly::Optional<uint8_t> padding, bool eom) override { return getCodec(stream).generateBody( writeBuf, stream, std::move(chain), padding, eom); } size_t generateTrailers(folly::IOBufQueue& writeBuf, StreamID stream, const HTTPHeaders& trailers) override { return getCodec(stream).generateTrailers(writeBuf, stream, trailers); } size_t generateEOM(folly::IOBufQueue& writeBuf, StreamID stream) override { return getCodec(stream).generateEOM(writeBuf, stream); } CompressionInfo getCompressionInfo() const override { return qpackCodec_.getCompressionInfo(); } // HTTPCodec API uint32_t getDefaultWindowSize() const override { return std::numeric_limits<uint32_t>::max(); } HTTPSettings* getEgressSettings() override { return &egressSettings_; } uint64_t nextPushID() { CHECK_EQ(transportDirection_, TransportDirection::DOWNSTREAM); return nextPushID_++; } void onIngressPushId(uint64_t pushId) { minUnseenPushID_ = std::max(minUnseenPushID_, pushId + 1); } protected: HTTPCodec& getCurrentCodec() { return getCodec(currentStream_); } const HTTPCodec& getCurrentCodec() const { return getCodec(currentStream_); } HQStreamCodec& getCodec(StreamID stream) { auto it = codecs_.find(stream); CHECK(it != codecs_.end()) << "stream=" << stream; return *it->second; } const HQStreamCodec& getCodec(StreamID stream) const { auto it = codecs_.find(stream); CHECK(it != codecs_.end()) << "stream=" << stream; return *it->second; } HTTPSettings ingressSettings_; // Turn peer's QPACK dynamic table on by default HTTPSettings egressSettings_{ {SettingsId::HEADER_TABLE_SIZE, kDefaultEgressHeaderTableSize}, {SettingsId::MAX_HEADER_LIST_SIZE, kDefaultEgressMaxHeaderListSize}, {SettingsId::_HQ_QPACK_BLOCKED_STREAMS, hq::kDefaultEgressQpackBlockedStream}}; mutable StreamID currentStream_{HTTPCodec::MaxStreamID}; folly::F14FastMap<StreamID, std::unique_ptr<HQStreamCodec>> codecs_; QPACKCodec qpackCodec_; folly::IOBufQueue qpackEncoderWriteBuf_{ folly::IOBufQueue::cacheChainLength()}; folly::IOBufQueue qpackDecoderWriteBuf_{ folly::IOBufQueue::cacheChainLength()}; std::function<uint64_t()> qpackEncoderMaxDataFn_; uint64_t nextPushID_{0}; }; }} // namespace proxygen::hq