thrift/lib/cpp2/async/MultiplexAsyncProcessor.cpp (375 lines of code) (raw):

/* * Copyright (c) Meta Platforms, Inc. and affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include <thrift/lib/cpp2/async/MultiplexAsyncProcessor.h> #include <optional> #include <string_view> #include <folly/GLog.h> #include <folly/Overload.h> #include <folly/container/F14Map.h> #include <folly/container/F14Set.h> #include <thrift/lib/cpp2/async/AsyncProcessorHelper.h> #include <thrift/lib/cpp2/server/ServerFlags.h> namespace apache::thrift { namespace { std::optional<std::string_view> getInteractionNameFromMethodName( std::string_view methodName) { // Interaction methods are formatted like "{interaction_name}.{method_name}" auto separator = methodName.find('.'); if (separator == std::string_view::npos) { return std::nullopt; } return methodName.substr(0, separator); } struct MetadataImpl final : public AsyncProcessorFactory::MethodMetadata { // Index of the AsyncProcessor to route this method to std::size_t sourceIndex; // The underlying metadata produced by the AsyncProcessorFactory at // sourceIndex std::shared_ptr<const AsyncProcessorFactory::MethodMetadata> inner; explicit MetadataImpl( std::size_t sourceIndex, std::shared_ptr<const AsyncProcessorFactory::MethodMetadata>&& inner) : sourceIndex(sourceIndex), inner(std::move(inner)) {} }; MultiplexAsyncProcessorFactory::CompositionMetadata computeCompositionMetadata( const std::vector<std::shared_ptr<AsyncProcessorFactory>>& processorFactories) { MultiplexAsyncProcessorFactory::CompositionMetadata compositionMetadata; AsyncProcessorFactory::MethodMetadataMap knownMethods; folly::F14FastSet<std::string> seenInteractions; auto addToKnownMethods = [&](AsyncProcessorFactory::MethodMetadataMap& metadataMap, std::size_t sourceIndex) { folly::F14FastSet<std::string_view> newlySeenInteractions; for (auto& [name, metadata] : metadataMap) { if (auto interactionName = getInteractionNameFromMethodName(name)) { if (seenInteractions.contains(*interactionName)) { // The interaction defined earlier gets precedence and no methods // should be added for it from a later AsyncProcessorFactory. This // prevents the latter interaction from being ever created. continue; } newlySeenInteractions.emplace(*interactionName); } // Note that emplace does NOT replace existing entries - methods // from earlier AsyncProcessorFactory's always take precedence. knownMethods.emplace( name, std::make_shared<MetadataImpl>(sourceIndex, std::move(metadata))); } auto oldSize = seenInteractions.size(); (void)oldSize; seenInteractions.insert( newlySeenInteractions.begin(), newlySeenInteractions.end()); // If we implemented it right, seenInteractions and // newlySeenInteractions should be distinct DCHECK( seenInteractions.size() == oldSize + newlySeenInteractions.size()); }; for (std::size_t sourceIndex = 0; sourceIndex < processorFactories.size(); ++sourceIndex) { auto metadataResult = processorFactories[sourceIndex]->createMethodMetadata(); auto* metadataMap = std::get_if<AsyncProcessorFactory::MethodMetadataMap>(&metadataResult); if (metadataMap == nullptr) { // Per our contract, the first wildcard-like processor will swallow up all // methods that are not explicitly named in knownMethods by a preceding // processor. So, as long as we keep track of the first such processor, we // can route all wildcard methods to it in constant time. if (auto* wildcardMetadataMap = std::get_if<AsyncProcessorFactory::WildcardMethodMetadataMap>( &metadataResult)) { addToKnownMethods(wildcardMetadataMap->knownMethods, sourceIndex); compositionMetadata.firstWildcardLike = MultiplexAsyncProcessorFactory::CompositionMetadata::Wildcard{ sourceIndex}; } else { DCHECK(std::holds_alternative< AsyncProcessorFactory::MetadataNotImplemented>(metadataResult)); compositionMetadata.firstWildcardLike = MultiplexAsyncProcessorFactory:: CompositionMetadata::WildcardNoMetadata{sourceIndex}; } break; } addToKnownMethods(*metadataMap, sourceIndex); } compositionMetadata.cachedMethodMetadataResult = [&]() -> AsyncProcessorFactory::CreateMethodMetadataResult { if (compositionMetadata.wildcardIndex().has_value()) { return AsyncProcessorFactory::WildcardMethodMetadataMap{ std::move(knownMethods)}; } return std::move(knownMethods); }(); return compositionMetadata; } class MultiplexAsyncProcessor final : public AsyncProcessor { public: void processSerializedRequest( ResponseChannelRequest::UniquePtr, SerializedRequest&&, protocol::PROTOCOL_TYPES, Cpp2RequestContext*, folly::EventBase*, concurrency::ThreadManager*) override { LOG(FATAL) << "This method should never be called since we implement createMethodMetadata()"; } void processSerializedCompressedRequestWithMetadata( ResponseChannelRequest::UniquePtr req, SerializedCompressedRequest&& serializedRequest, const MethodMetadata& untypedMethodMetadata, protocol::PROTOCOL_TYPES protocolType, Cpp2RequestContext* context, folly::EventBase* eb, concurrency::ThreadManager* tm) override { auto maybeTrackInteraction = [&](AsyncProcessor& processor) { if (auto interactionCreate = context->getInteractionCreate(); interactionCreate.has_value() && *interactionCreate->interactionId_ref() > 0) { if (&processor == defaultInteractionProcessor_) { return; } if (defaultInteractionProcessor_ == nullptr) { // The first to create an interaction gets the privilege of being the // "default" defaultInteractionProcessor_ = &processor; } else { // Note that when there is a duplicate interaction we do not replace // the existing entry. A connection should have unique interaction IDs // which means that the underlying processor should fail (and close // the connection) - it's safe to ignore the duplicated ID. inflightInteractions_.emplace( *interactionCreate->interactionId_ref(), &processor); } } }; const bool isWildcard = AsyncProcessorHelper::isWildcardMethodMetadata(untypedMethodMetadata); if (!isWildcard) { const auto& methodMetadata = AsyncProcessorHelper::expectMetadataOfType<MetadataImpl>( untypedMethodMetadata); DCHECK(methodMetadata.sourceIndex < processors_.size()); auto& processor = *processors_[methodMetadata.sourceIndex]; maybeTrackInteraction(processor); processor.processSerializedCompressedRequestWithMetadata( std::move(req), std::move(serializedRequest), *methodMetadata.inner, protocolType, context, eb, tm); return; } const auto& wildcardMethodMetadata = untypedMethodMetadata; folly::variant_match( compositionMetadata_.firstWildcardLike, [](std::monostate) { LOG(FATAL) << "Received WildcardMethodMetadata but expected no WildcardMethodMetadataMap was composed"; }, [&](const MultiplexAsyncProcessorFactory::CompositionMetadata::Wildcard& wildcard) { DCHECK(wildcard.index < processors_.size()); auto& processor = *processors_[wildcard.index]; maybeTrackInteraction(processor); processor.processSerializedCompressedRequestWithMetadata( std::move(req), std::move(serializedRequest), wildcardMethodMetadata, protocolType, context, eb, tm); }, [&](const MultiplexAsyncProcessorFactory::CompositionMetadata:: WildcardNoMetadata& wildcard) { DCHECK(wildcard.index < processors_.size()); auto& processor = *processors_[wildcard.index]; maybeTrackInteraction(processor); processor.processSerializedCompressedRequest( std::move(req), std::move(serializedRequest), protocolType, context, eb, tm); }); } void getServiceMetadata( metadata::ThriftServiceMetadataResponse& actualResponse) override { auto copyMetadataInto = [](metadata::ThriftMetadata& dst, const metadata::ThriftServiceMetadataResponse& src) { // The multiplexed services makes reference to a composed service, so // we need to include it in the metadata. auto& metadata = *src.metadata_ref(); dst.enums_ref()->insert( metadata.enums_ref()->begin(), metadata.enums_ref()->end()); dst.structs_ref()->insert( metadata.structs_ref()->begin(), metadata.structs_ref()->end()); dst.exceptions_ref()->insert( metadata.exceptions_ref()->begin(), metadata.exceptions_ref()->end()); dst.services_ref()->insert( metadata.services_ref()->begin(), metadata.services_ref()->end()); }; for (auto& processor : processors_) { metadata::ThriftServiceMetadataResponse response; processor->getServiceMetadata(response); // Copying gives precedence to earlier inserted entries - this matches // the semantics of the processor method delegation. copyMetadataInto(*actualResponse.metadata_ref(), response); actualResponse.services_ref()->insert( actualResponse.services_ref()->end(), response.services_ref()->begin(), response.services_ref()->end()); } // The underlying AsyncProcessor::getServiceMetadata may not be implemented // if using: a service compiled without metadata support; or a custom // AsyncProcessor implementation if (actualResponse.services_ref()->empty()) { return; } // TODO(praihan): Remove context field from metadata response object // There is no "correct" way to set context for MultiplexAsyncProcessor. // Our best guess would be the first service, which is likely to be most // relevant to the user. const auto& defaultServiceContextRef = actualResponse.services_ref()->front(); actualResponse.context_ref()->service_info_ref() = actualResponse.metadata_ref()->services_ref()->at( *defaultServiceContextRef.service_name_ref()); actualResponse.context_ref()->module_ref() = *defaultServiceContextRef.module_ref(); } void terminateInteraction( int64_t id, Cpp2ConnContext& ctx, folly::EventBase& eb) noexcept override { AsyncProcessor* processor = nullptr; if (auto foundProcessor = folly::get_ptr(inflightInteractions_, id)) { processor = *foundProcessor; } else { // The first processor serves the niche of being the "default" interaction // creator. We don't put its IDs in the map. processor = defaultInteractionProcessor_; } if (processor != nullptr) { processor->terminateInteraction(id, ctx, eb); } } void destroyAllInteractions( Cpp2ConnContext& ctx, folly::EventBase& eb) noexcept override { inflightInteractions_.clear(); for (auto& processor : processors_) { processor->destroyAllInteractions(ctx, eb); } } explicit MultiplexAsyncProcessor( std::vector<std::unique_ptr<AsyncProcessor>>&& processors, const MultiplexAsyncProcessorFactory::CompositionMetadata& compositionMetadata) : processors_(std::move(processors)), compositionMetadata_(compositionMetadata) { DCHECK(!processors_.empty()); } void executeRequest( ServerRequest&& request, const MethodMetadata& methodMetadata) override { auto [processor, metadata] = derefProcessor(methodMetadata); processor->executeRequest(std::move(request), *metadata); return; } std::pair<AsyncProcessor*, const AsyncProcessorFactory::MethodMetadata*> getRequestsProcessor( const AsyncProcessorFactory::MethodMetadata& methodMetadata) override { return derefProcessor(methodMetadata); } private: const std::vector<std::unique_ptr<AsyncProcessor>> processors_; const MultiplexAsyncProcessorFactory::CompositionMetadata& compositionMetadata_; folly::F14FastMap<std::int64_t, AsyncProcessor*> inflightInteractions_; // The "default" processor is expected to consume a disproportionately large // chunk of the traffic. To save memory, we avoid tracking interactions in the // map for the first processor that creates an interaction. AsyncProcessor* defaultInteractionProcessor_{nullptr}; std::pair<AsyncProcessor*, const AsyncProcessorFactory::MethodMetadata*> derefProcessor(const MethodMetadata& methodMetadata) const { if (AsyncProcessorHelper::isWildcardMethodMetadata(methodMetadata)) { return std::make_pair( processors_[compositionMetadata_.wildcardIndex().value()].get(), &methodMetadata); } const auto& multiplexMethodMetadata = AsyncProcessorHelper::expectMetadataOfType<MetadataImpl>( methodMetadata); return std::make_pair( processors_[multiplexMethodMetadata.sourceIndex].get(), multiplexMethodMetadata.inner.get()); } }; } // namespace MultiplexAsyncProcessorFactory::MultiplexAsyncProcessorFactory( std::vector<std::shared_ptr<AsyncProcessorFactory>> processorFactories) : processorFactories_( flattenProcessorFactories(std::move(processorFactories))), compositionMetadata_(computeCompositionMetadata(processorFactories_)) { CHECK(!processorFactories_.empty()); for (auto& processorFactory : processorFactories_) { auto requestInfoMap = processorFactory->getServiceRequestInfoMap(); if (requestInfoMap) { serviceRequestInfoMap_.insert( requestInfoMap->get().begin(), requestInfoMap->get().end()); } } } /* static */ std::vector<std::shared_ptr<AsyncProcessorFactory>> MultiplexAsyncProcessorFactory::flattenProcessorFactories( std::vector<std::shared_ptr<AsyncProcessorFactory>> processorFactories) { std::vector<std::shared_ptr<AsyncProcessorFactory>> result; for (auto& factory : processorFactories) { auto* multiplexFactory = dynamic_cast<MultiplexAsyncProcessorFactory*>(factory.get()); if (multiplexFactory == nullptr) { result.emplace_back(std::move(factory)); } else { result.insert( result.end(), std::make_move_iterator( multiplexFactory->processorFactories_.begin()), std::make_move_iterator(multiplexFactory->processorFactories_.end())); } } return result; } std::unique_ptr<AsyncProcessor> MultiplexAsyncProcessorFactory::getProcessor() { return getProcessorWithUnderlyingModifications({} /* modifier */); } std::unique_ptr<AsyncProcessor> MultiplexAsyncProcessorFactory::getProcessorWithUnderlyingModifications( folly::FunctionRef<void(AsyncProcessor&)> modifier) { std::vector<std::unique_ptr<AsyncProcessor>> processors; processors.reserve(processorFactories_.size()); for (auto& processorFactory : processorFactories_) { auto processor = processorFactory->getProcessor(); if (modifier) { modifier(*processor); } processors.emplace_back(std::move(processor)); } return std::make_unique<MultiplexAsyncProcessor>( std::move(processors), compositionMetadata_); } AsyncProcessorFactory::CreateMethodMetadataResult MultiplexAsyncProcessorFactory::createMethodMetadata() { return compositionMetadata_.cachedMethodMetadataResult; } std::shared_ptr<folly::RequestContext> MultiplexAsyncProcessorFactory::getBaseContextForRequest( const MethodMetadata& untypedMethodMetadata) { const bool isWildcard = AsyncProcessorHelper::isWildcardMethodMetadata(untypedMethodMetadata); if (!isWildcard) { const auto& methodMetadata = AsyncProcessorHelper::expectMetadataOfType<MetadataImpl>( untypedMethodMetadata); return processorFactories_[methodMetadata.sourceIndex] ->getBaseContextForRequest(*methodMetadata.inner); } const auto& wildcardMethodMetadata = untypedMethodMetadata; auto wildcardIndex = compositionMetadata_.wildcardIndex(); DCHECK(wildcardIndex.has_value()) << "Received WildcardMethodMetadata but expected no WildcardMethodMetadataMap was composed"; return processorFactories_[*wildcardIndex]->getBaseContextForRequest( wildcardMethodMetadata); } std::vector<ServiceHandlerBase*> MultiplexAsyncProcessorFactory::getServiceHandlers() { std::vector<ServiceHandlerBase*> result; for (auto& processorFactory : processorFactories_) { auto newHandlers = processorFactory->getServiceHandlers(); result.insert(result.end(), newHandlers.begin(), newHandlers.end()); } return result; } std::optional<std::size_t> MultiplexAsyncProcessorFactory::CompositionMetadata::wildcardIndex() const { using Result = std::optional<std::size_t>; return folly::variant_match( firstWildcardLike, [](std::monostate) -> Result { return std::nullopt; }, [](auto&& wildcard) -> Result { return wildcard.index; }); } std::optional<std::reference_wrapper<ServiceRequestInfoMap const>> MultiplexAsyncProcessorFactory::getServiceRequestInfoMap() const { if (serviceRequestInfoMap_.size()) { return std::cref(serviceRequestInfoMap_); } return std::nullopt; } } // namespace apache::thrift