thrift/lib/cpp2/async/AsyncProcessorHelper.cpp (84 lines of code) (raw):

/* * Copyright (c) Meta Platforms, Inc. and affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include <thrift/lib/cpp2/async/AsyncProcessorHelper.h> #include <fmt/core.h> #include <thrift/lib/cpp/TApplicationException.h> namespace apache::thrift { /* static */ void AsyncProcessorHelper::sendUnknownMethodError( ResponseChannelRequest::UniquePtr request, std::string_view methodName) { auto message = fmt::format("Method name {} not found", methodName); request->sendErrorWrapped( folly::make_exception_wrapper<TApplicationException>( TApplicationException::UNKNOWN_METHOD, std::move(message)), kMethodUnknownErrorCode); } /* static */ void AsyncProcessorHelper::executeRequest( ServerRequest&& serverRequest) { // Since this request was queued, reset the processBegin // time to the actual start time, and not the queue time. auto ctx = serverRequest.requestContext(); if (ctx->getTimestamps().getSamplingStatus().isEnabled()) { ctx->getTimestamps().processBegin = std::chrono::steady_clock::now(); } auto ap = detail::ServerRequestHelper::asyncProcessor(serverRequest); AsyncProcessorFactory::MethodMetadata const& metadata = *serverRequest.methodMetadata(); folly::RequestContextScopeGuard rctx(serverRequest.follyRequestContext()); if (auto requestInfo = serverRequest.requestInfo()) { // Currently we will only execute requests with a valid requestInfo. We will // have to modify/relax this restriction to handle wildcard services. try { std::unique_ptr<ContextStack> ctxStack(ap->getContextStack( ap->getServiceName(), requestInfo->functionName_deprecated, serverRequest.requestContext())); if (ctxStack) { ctxStack->preRead(); SerializedMessage smsg; smsg.protocolType = detail::ServerRequestHelper::protocol(serverRequest); // This provides the compressed data which is not the previous behavior. // TODO: T112295873 smsg.buffer = detail::ServerRequestHelper::compressedRequest(serverRequest) .compressedBuffer(); smsg.methodName = requestInfo->functionName_deprecated; ctxStack->onReadData(smsg); // Provide the length of the compressed data // TODO: T112295873 ctxStack->postRead( nullptr, detail::ServerRequestHelper::compressedRequest(serverRequest) .compressedBuffer() ->computeChainDataLength()); detail::ServerRequestHelper::setContextStack( serverRequest, std::move(ctxStack)); } ap->executeRequest(std::move(serverRequest), metadata); } catch (std::exception& ex) { // Temporary code - just ensure that a failure produces an error. // TODO: T113039894 folly::exception_wrapper ew(std::current_exception(), ex); auto eb = detail::ServerRequestHelper::eventBase(serverRequest); auto req = detail::ServerRequestHelper::request(std::move(serverRequest)); eb->runInEventBaseThread([request = std::move(req)]() { request->sendErrorWrapped( folly::make_exception_wrapper<TApplicationException>( TApplicationException::INTERNAL_ERROR, "AsyncProcessorHelper::executeRequest - resource pools mode"), kUnknownErrorCode); }); return; } } else { auto eb = detail::ServerRequestHelper::eventBase(serverRequest); eb->runInEventBaseThread( [serverRequest = std::move(serverRequest)]() mutable { auto methodName = serverRequest.requestContext()->getMethodName(); sendUnknownMethodError( detail::ServerRequestHelper::request(std::move(serverRequest)), methodName); }); } } /* static */ SelectPoolResult AsyncProcessorHelper::selectResourcePool( const ServerRequest& request, const AsyncProcessorFactory::MethodMetadata&) { if (auto requestInfo = request.requestInfo()) { if (requestInfo->isSync) { return std::ref(ResourcePoolHandle::defaultSync()); } else { return std::ref(ResourcePoolHandle::defaultAsync()); } } return std::ref(ResourcePoolHandle::defaultAsync()); } } // namespace apache::thrift