thrift/lib/cpp2/transport/rocket/server/ThriftRocketServerHandler.cpp (721 lines of code) (raw):

/* * Copyright (c) Meta Platforms, Inc. and affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include <thrift/lib/cpp2/transport/rocket/server/ThriftRocketServerHandler.h> #include <memory> #include <utility> #include <fmt/core.h> #include <folly/ExceptionString.h> #include <folly/ExceptionWrapper.h> #include <folly/GLog.h> #include <folly/Overload.h> #include <folly/io/Cursor.h> #include <folly/io/IOBuf.h> #include <thrift/lib/cpp/TApplicationException.h> #include <thrift/lib/cpp2/Flags.h> #include <thrift/lib/cpp2/async/AsyncProcessorHelper.h> #include <thrift/lib/cpp2/async/ResponseChannel.h> #include <thrift/lib/cpp2/protocol/CompactProtocol.h> #include <thrift/lib/cpp2/server/Cpp2ConnContext.h> #include <thrift/lib/cpp2/server/Cpp2Worker.h> #include <thrift/lib/cpp2/server/LoggingEventHelper.h> #include <thrift/lib/cpp2/server/MonitoringMethodNames.h> #include <thrift/lib/cpp2/transport/core/ThriftRequest.h> #include <thrift/lib/cpp2/transport/rocket/PayloadUtils.h> #include <thrift/lib/cpp2/transport/rocket/RocketException.h> #include <thrift/lib/cpp2/transport/rocket/framing/ErrorCode.h> #include <thrift/lib/cpp2/transport/rocket/framing/Frames.h> #include <thrift/lib/cpp2/transport/rocket/server/RocketServerConnection.h> #include <thrift/lib/cpp2/transport/rocket/server/RocketServerFrameContext.h> #include <thrift/lib/cpp2/transport/rocket/server/RocketSinkClientCallback.h> #include <thrift/lib/cpp2/transport/rocket/server/RocketStreamClientCallback.h> #include <thrift/lib/cpp2/transport/rocket/server/RocketThriftRequests.h> #include <thrift/lib/cpp2/util/Checksum.h> #include <thrift/lib/thrift/gen-cpp2/RpcMetadata_constants.h> namespace { const int64_t kRocketServerMaxVersion = 9; const int64_t kRocketServerMinVersion = 6; } // namespace THRIFT_FLAG_DEFINE_bool(rocket_server_legacy_protocol_key, true); THRIFT_FLAG_DEFINE_int64(rocket_server_max_version, kRocketServerMaxVersion); THRIFT_FLAG_DEFINE_int64(monitoring_over_user_logging_sample_rate, 0); namespace apache { namespace thrift { namespace rocket { thread_local uint32_t ThriftRocketServerHandler::sample_{0}; namespace { bool isMetadataValid(const RequestRpcMetadata& metadata, RpcKind expectedKind) { return metadata.protocol_ref() && metadata.name_ref() && metadata.kind_ref() && metadata.kind_ref() == expectedKind; } } // namespace ThriftRocketServerHandler::ThriftRocketServerHandler( std::shared_ptr<Cpp2Worker> worker, const folly::SocketAddress& clientAddress, const folly::AsyncTransport* transport, const std::vector<std::unique_ptr<SetupFrameHandler>>& handlers) : worker_(std::move(worker)), connectionGuard_(worker_->getActiveRequestsGuard()), connContext_( &clientAddress, transport, nullptr, /* eventBaseManager */ nullptr, /* duplexChannel */ nullptr, /* x509PeerCert */ worker_->getServer()->getClientIdentityHook(), worker_.get()), setupFrameHandlers_(handlers), version_(static_cast<int32_t>(std::min( kRocketServerMaxVersion, THRIFT_FLAG(rocket_server_max_version)))) { connContext_.setTransportType(Cpp2ConnContext::TransportType::ROCKET); for (const auto& handler : worker_->getServer()->getEventHandlersUnsafe()) { handler->newConnection(&connContext_); } } ThriftRocketServerHandler::~ThriftRocketServerHandler() { for (const auto& handler : worker_->getServer()->getEventHandlersUnsafe()) { handler->connectionDestroyed(&connContext_); } // Ensure each connAccepted() call has a matching connClosed() if (auto* observer = worker_->getServer()->getObserver()) { observer->connClosed(); } } apache::thrift::server::TServerObserver::SamplingStatus ThriftRocketServerHandler::shouldSample() { bool isServerSamplingEnabled = (sampleRate_ > 0) && ((sample_++ % sampleRate_) == 0); // TODO: determine isClientSamplingEnabled by "client_logging_enabled" header return apache::thrift::server::TServerObserver::SamplingStatus( isServerSamplingEnabled, false); } void ThriftRocketServerHandler::handleSetupFrame( SetupFrame&& frame, RocketServerConnection& connection) { if (!frame.payload().hasNonemptyMetadata()) { return connection.close(folly::make_exception_wrapper<RocketException>( ErrorCode::INVALID_SETUP, "Missing required metadata in SETUP frame")); } folly::io::Cursor cursor(frame.payload().buffer()); // Validate Thrift protocol key uint32_t protocolKey; const bool success = cursor.tryReadBE<uint32_t>(protocolKey); constexpr uint32_t kLegacyRocketProtocolKey = 1; if (!success || ((!THRIFT_FLAG(rocket_server_legacy_protocol_key) || protocolKey != kLegacyRocketProtocolKey) && protocolKey != RpcMetadata_constants::kRocketProtocolKey())) { if (!frame.rocketMimeTypes()) { return connection.close(folly::make_exception_wrapper<RocketException>( ErrorCode::INVALID_SETUP, "Incompatible Thrift version")); } // If Rocket MIME types are used the protocol key is optional. if (success) { cursor.retreat(4); } } try { CompactProtocolReader reader; reader.setInput(cursor); RequestSetupMetadata meta; // Throws on read error meta.read(&reader); if (reader.getCursorPosition() > frame.payload().metadataSize()) { return connection.close(folly::make_exception_wrapper<RocketException>( ErrorCode::INVALID_SETUP, "Error deserializing SETUP payload: underflow")); } connContext_.readSetupMetadata(meta); auto minVersion = meta.minVersion_ref().value_or(0); auto maxVersion = meta.maxVersion_ref().value_or(0); THRIFT_CONNECTION_EVENT(rocket.setup).log(connContext_, [&] { return folly::dynamic::object("client_min_version", minVersion)( "client_max_version", maxVersion)("server_version", version_); }); if (minVersion > version_) { return connection.close(folly::make_exception_wrapper<RocketException>( ErrorCode::INVALID_SETUP, "Incompatible Rocket version")); } if (maxVersion < kRocketServerMinVersion) { return connection.close(folly::make_exception_wrapper<RocketException>( ErrorCode::INVALID_SETUP, "Incompatible Rocket version")); } version_ = std::min(version_, maxVersion); if (version_ >= 9 && !frame.rocketMimeTypes()) { return connection.close(folly::make_exception_wrapper<RocketException>( ErrorCode::INVALID_SETUP, "Unsupported MIME types")); } eventBase_ = connContext_.getTransport()->getEventBase(); for (const auto& h : setupFrameHandlers_) { auto processorInfo = h->tryHandle(meta); if (processorInfo) { bool valid = true; processorFactory_ = std::addressof(processorInfo->processorFactory_); serviceMetadata_ = std::addressof(worker_->getMetadataForService(*processorFactory_)); serviceRequestInfoMap_ = processorFactory_->getServiceRequestInfoMap(); valid &= !!(processor_ = processorFactory_->getProcessor()); // Allow no thread manager if resource pools in use valid &= (!!(threadManager_ = std::move(processorInfo->threadManager_)) || useResourcePoolsFlagsSet()); valid &= !!(serverConfigs_ = &processorInfo->serverConfigs_); requestsRegistry_ = processorInfo->requestsRegistry_ != nullptr ? processorInfo->requestsRegistry_ : worker_->getRequestsRegistry(); if (!valid) { return connection.close( folly::make_exception_wrapper<RocketException>( ErrorCode::INVALID_SETUP, "Error in implementation of custom connection handler.")); } return; } } // no custom frame handler was found, do the default processorFactory_ = std::addressof(worker_->getServer()->getDecoratedProcessorFactory()); serviceMetadata_ = std::addressof(worker_->getMetadataForService(*processorFactory_)); serviceRequestInfoMap_ = processorFactory_->getServiceRequestInfoMap(); processor_ = processorFactory_->getProcessor(); if (!useResourcePoolsFlagsSet()) { threadManager_ = worker_->getServer()->getThreadManager(); } serverConfigs_ = worker_->getServer(); requestsRegistry_ = worker_->getRequestsRegistry(); // add sampleRate if (serverConfigs_) { if (auto* observer = serverConfigs_->getObserver()) { sampleRate_ = observer->getSampleRate(); } } if (meta.dscpToReflect_ref() || meta.markToReflect_ref()) { connection.applyDscpAndMarkToSocket(meta); } DCHECK_GE(version_, 5); ServerPushMetadata serverMeta; serverMeta.set_setupResponse(); serverMeta.setupResponse_ref()->version_ref() = version_; serverMeta.setupResponse_ref()->zstdSupported_ref() = true; CompactProtocolWriter compactProtocolWriter; folly::IOBufQueue queue; compactProtocolWriter.setOutput(&queue); serverMeta.write(&compactProtocolWriter); connection.sendMetadataPush(std::move(queue).move()); } catch (const std::exception& e) { return connection.close(folly::make_exception_wrapper<RocketException>( ErrorCode::INVALID_SETUP, fmt::format( "Error deserializing SETUP payload: {}", folly::exceptionStr(e).toStdString()))); } } void ThriftRocketServerHandler::handleRequestResponseFrame( RequestResponseFrame&& frame, RocketServerFrameContext&& context) { auto makeRequestResponse = [&](RequestRpcMetadata&& md, rocket::Payload&& debugPayload, std::shared_ptr<folly::RequestContext> ctx) { // Note, we're passing connContext by reference and rely on the next // chain of ownership to keep it alive: ThriftServerRequestResponse // stores RocketServerFrameContext, which keeps refcount on // RocketServerConnection, which in turn keeps ThriftRocketServerHandler // alive, which in turn keeps connContext_ alive. return RequestsRegistry::makeRequest<ThriftServerRequestResponse>( *eventBase_, *serverConfigs_, std::move(md), connContext_, std::move(ctx), *requestsRegistry_, std::move(debugPayload), std::move(context), version_); }; handleRequestCommon( std::move(frame.payload()), std::move(makeRequestResponse), RpcKind::SINGLE_REQUEST_SINGLE_RESPONSE); } void ThriftRocketServerHandler::handleRequestFnfFrame( RequestFnfFrame&& frame, RocketServerFrameContext&& context) { auto makeRequestFnf = [&](RequestRpcMetadata&& md, rocket::Payload&& debugPayload, std::shared_ptr<folly::RequestContext> ctx) { // Note, we're passing connContext by reference and rely on a complex // chain of ownership (see handleRequestResponseFrame for detailed // explanation). return RequestsRegistry::makeRequest<ThriftServerRequestFnf>( *eventBase_, *serverConfigs_, std::move(md), connContext_, std::move(ctx), *requestsRegistry_, std::move(debugPayload), std::move(context), [keepAlive = processor_] {}); }; handleRequestCommon( std::move(frame.payload()), std::move(makeRequestFnf), RpcKind::SINGLE_REQUEST_NO_RESPONSE); } void ThriftRocketServerHandler::handleRequestStreamFrame( RequestStreamFrame&& frame, RocketServerFrameContext&& context, RocketStreamClientCallback* clientCallback) { auto makeRequestStream = [&](RequestRpcMetadata&& md, rocket::Payload&& debugPayload, std::shared_ptr<folly::RequestContext> ctx) { return RequestsRegistry::makeRequest<ThriftServerRequestStream>( *eventBase_, *serverConfigs_, std::move(md), connContext_, std::move(ctx), *requestsRegistry_, std::move(debugPayload), std::move(context), version_, clientCallback, processor_); }; handleRequestCommon( std::move(frame.payload()), std::move(makeRequestStream), RpcKind::SINGLE_REQUEST_STREAMING_RESPONSE); } void ThriftRocketServerHandler::handleRequestChannelFrame( RequestChannelFrame&& frame, RocketServerFrameContext&& context, RocketSinkClientCallback* clientCallback) { auto makeRequestSink = [&](RequestRpcMetadata&& md, rocket::Payload&& debugPayload, std::shared_ptr<folly::RequestContext> ctx) { return RequestsRegistry::makeRequest<ThriftServerRequestSink>( *eventBase_, *serverConfigs_, std::move(md), connContext_, std::move(ctx), *requestsRegistry_, std::move(debugPayload), std::move(context), version_, clientCallback, processor_); }; handleRequestCommon( std::move(frame.payload()), std::move(makeRequestSink), RpcKind::SINK); } void ThriftRocketServerHandler::connectionClosing() { connContext_.connectionClosed(); if (processor_) { processor_->destroyAllInteractions(connContext_, *eventBase_); } } template <class F> void ThriftRocketServerHandler::handleRequestCommon( Payload&& payload, F&& makeRequest, RpcKind expectedKind) { // setup request sampling for counters and stats auto samplingStatus = shouldSample(); std::chrono::steady_clock::time_point readEnd; if (UNLIKELY(samplingStatus.isEnabled())) { readEnd = std::chrono::steady_clock::now(); } rocket::Payload debugPayload = payload.clone(); auto requestPayloadTry = unpackAsCompressed<RequestPayload>(std::move(payload)); auto makeActiveRequest = [&](auto&& md, auto&& payload, auto&& reqCtx) { serverConfigs_->incActiveRequests(); return makeRequest(std::move(md), std::move(payload), std::move(reqCtx)); }; auto createDefaultRequestContext = [&] { return std::make_shared<folly::RequestContext>( requestsRegistry_->genRootId()); }; if (requestPayloadTry.hasException()) { handleDecompressionFailure( makeActiveRequest( RequestRpcMetadata(), rocket::Payload{}, createDefaultRequestContext()), requestPayloadTry.exception().what().toStdString()); return; } auto& data = requestPayloadTry->payload; auto& metadata = requestPayloadTry->metadata; if (!isMetadataValid(metadata, expectedKind)) { handleRequestWithBadMetadata(makeActiveRequest( std::move(metadata), std::move(debugPayload), createDefaultRequestContext())); return; } if (worker_->isStopping()) { handleServerShutdown(makeActiveRequest( std::move(metadata), std::move(debugPayload), createDefaultRequestContext())); return; } THRIFT_APPLICATION_EVENT(server_read_headers).log([&] { auto size = metadata.otherMetadata_ref() ? metadata.otherMetadata_ref()->size() : 0; std::vector<folly::dynamic> keys; if (size) { keys.reserve(size); for (auto& [k, v] : *metadata.otherMetadata_ref()) { keys.push_back(k); } } int fmd_sz = 0; if (auto fmd = metadata.frameworkMetadata_ref()) { DCHECK(*fmd) << "initialized IOBuf is null"; fmd_sz = (**fmd).computeChainDataLength(); } return folly::dynamic::object("size", size) // ("keys", folly::dynamic::array(std::move(keys))) // ("frameworkMetadataSize", fmd_sz); }); if (metadata.crc32c_ref()) { try { if (auto compression = metadata.compression_ref()) { data = uncompressBuffer(std::move(data), *compression); } } catch (...) { handleDecompressionFailure( makeActiveRequest( std::move(metadata), rocket::Payload{}, createDefaultRequestContext()), folly::exceptionStr(std::current_exception()).toStdString()); return; } } // check the checksum const bool badChecksum = metadata.crc32c_ref() && (*metadata.crc32c_ref() != checksum::crc32c(*data)); if (badChecksum) { handleRequestWithBadChecksum(makeActiveRequest( std::move(metadata), std::move(debugPayload), createDefaultRequestContext())); return; } if (auto injectedFailure = worker_->getServer()->maybeInjectFailure(); injectedFailure != ThriftServer::InjectedFailure::NONE) { InjectedFault injectedFault; switch (injectedFailure) { case ThriftServer::InjectedFailure::NONE: folly::assume_unreachable(); case ThriftServer::InjectedFailure::ERROR: injectedFault = InjectedFault::ERROR; break; case ThriftServer::InjectedFailure::DROP: injectedFault = InjectedFault::DROP; break; case ThriftServer::InjectedFailure::DISCONNECT: injectedFault = InjectedFault::DISCONNECT; break; } handleInjectedFault( makeActiveRequest( std::move(metadata), std::move(debugPayload), createDefaultRequestContext()), injectedFault); return; } using PerServiceMetadata = Cpp2Worker::PerServiceMetadata; const PerServiceMetadata::FindMethodResult methodMetadataResult = serviceMetadata_->findMethod( metadata.name_ref() ? metadata.name_ref()->view() : std::string_view{}); // need to call with empty string_view // because we still distinguish // between NotImplemented and // MetadataNotFound auto rootid = requestsRegistry_->genRootId(); auto baseReqCtx = serviceMetadata_->getBaseContextForRequest(methodMetadataResult); auto reqCtx = baseReqCtx ? folly::RequestContext::copyAsRoot(*baseReqCtx, rootid) : std::make_shared<folly::RequestContext>(rootid); folly::RequestContextScopeGuard rctx(reqCtx); // A request should not be active until the overload checking is done. auto request = makeRequest( std::move(metadata), std::move(debugPayload), std::move(reqCtx)); // check if server is overloaded const auto& headers = request->getTHeader().getHeaders(); const auto& name = request->getMethodName(); bool useResourcePools = useResourcePoolsFlagsSet() && !serverConfigs_->resourcePoolSet().empty(); if (useResourcePools) { serverConfigs_->incActiveRequests(); if (!serverConfigs_->shouldHandleRequestForMethod(name)) { handleServerNotReady(std::move(request)); return; } } else { auto errorCode = serverConfigs_->checkOverload(&headers, &name); serverConfigs_->incActiveRequests(); if (UNLIKELY(errorCode.has_value())) { handleRequestOverloadedServer(std::move(request), errorCode.value()); return; } if (!serverConfigs_->shouldHandleRequestForMethod(name)) { handleServerNotReady(std::move(request)); return; } auto preprocessResult = serverConfigs_->preprocess( {headers, name, connContext_, request.get()}); if (UNLIKELY(!std::holds_alternative<std::monostate>(preprocessResult))) { folly::variant_match( preprocessResult, [&](AppClientException& ace) { handleAppError( std::move(request), ace.name(), ace.getMessage(), true); }, [&](AppOverloadedException&) { handleRequestOverloadedServer( std::move(request), kAppOverloadedErrorCode); }, [&](const AppServerException& ase) { handleAppError( std::move(request), ase.name(), ase.getMessage(), false); }, [](std::monostate&) { folly::assume_unreachable(); }); return; } } logSetupConnectionEventsOnce(setupLoggingFlag_, connContext_); auto* cpp2ReqCtx = request->getRequestContext(); auto& timestamps = cpp2ReqCtx->getTimestamps(); timestamps.setStatus(samplingStatus); if (UNLIKELY(samplingStatus.isEnabled())) { timestamps.readEnd = readEnd; timestamps.processBegin = std::chrono::steady_clock::now(); } // Log monitoring methods that are called over non-monitoring interface so // that they can be migrated. LoggingSampler monitoringLogSampler{ THRIFT_FLAG(monitoring_over_user_logging_sample_rate)}; if (UNLIKELY(monitoringLogSampler.isSampled())) { if (LIKELY(connContext_.getInterfaceKind() != InterfaceKind::MONITORING)) { auto& methodName = request->getMethodName(); if (UNLIKELY(isMonitoringMethodName(methodName))) { THRIFT_CONNECTION_EVENT(monitoring_over_user) .logSampled(connContext_, monitoringLogSampler, [&] { return folly::dynamic::object("method_name", methodName); }); } } } if (serverConfigs_) { if (auto* observer = serverConfigs_->getObserver()) { observer->admittedRequest(&request->getMethodName()); // Expensive operations; happens only when sampling is enabled if (samplingStatus.isEnabledByServer()) { if (threadManager_) { observer->queuedRequests(threadManager_->pendingUpstreamTaskCount()); } observer->activeRequests(serverConfigs_->getActiveRequests()); } } } const auto protocolId = request->getProtoId(); if (auto interactionId = metadata.interactionId_ref()) { cpp2ReqCtx->setInteractionId(*interactionId); } if (auto interactionCreate = metadata.interactionCreate_ref()) { cpp2ReqCtx->setInteractionCreate(*interactionCreate); DCHECK_EQ(cpp2ReqCtx->getInteractionId(), 0); cpp2ReqCtx->setInteractionId(*interactionCreate->interactionId_ref()); } auto serializedCompressedRequest = SerializedCompressedRequest( std::move(data), metadata.crc32c_ref() ? CompressionAlgorithm::NONE : metadata.compression_ref().value_or(CompressionAlgorithm::NONE)); try { if (auto* found = std::get_if<PerServiceMetadata::MetadataFound>( &methodMetadataResult); LIKELY(found != nullptr)) { if (useResourcePools) { const ServiceRequestInfo* serviceRequestInfo = serviceRequestInfoMap_ ? folly::get_ptr( serviceRequestInfoMap_->get(), request->getMethodName()) : nullptr; if (!serviceRequestInfo) { std::string_view methodName = request->getMethodName(); AsyncProcessorHelper::sendUnknownMethodError( std::move(request), methodName); return; } ServerRequest serverRequest( std::move(request), std::move(serializedCompressedRequest), eventBase_, cpp2ReqCtx, protocolId, folly::RequestContext::saveContext(), processor_.get(), &found->metadata, serviceRequestInfo); // Once we remove the old code we'll move validateRpcKind to a helper. if (!GeneratedAsyncProcessor::validateRpcKind( serverRequest.request(), serverRequest.requestInfo()->rpcKind)) { return; } auto poolResult = AsyncProcessorHelper::selectResourcePool( serverRequest, found->metadata); if (auto* reject = std::get_if<ServerRequestRejection>(&poolResult)) { auto errorCode = kAppOverloadedErrorCode; if (reject->applicationException().getType() == TApplicationException::UNKNOWN_METHOD) { errorCode = kMethodUnknownErrorCode; } serverRequest.request()->sendErrorWrapped( folly::exception_wrapper( folly::in_place, std::move(*reject).applicationException()), errorCode); return; } auto resourcePoolHandle = std::get_if<std::reference_wrapper<const ResourcePoolHandle>>( &poolResult); DCHECK(serverConfigs_->resourcePoolSet().hasResourcePool( *resourcePoolHandle)); auto& resourcePool = serverConfigs_->resourcePoolSet().resourcePool(*resourcePoolHandle); apache::thrift::detail::ServerRequestHelper::setExecutor( serverRequest, resourcePool.executor().value_or(nullptr)); auto result = resourcePool.accept(std::move(serverRequest)); if (result) { auto errorCode = kQueueOverloadedErrorCode; serverRequest.request()->sendErrorWrapped( folly::exception_wrapper( folly::in_place, std::move(std::move(result).value()).applicationException()), errorCode); return; } } else { processor_->processSerializedCompressedRequestWithMetadata( std::move(request), std::move(serializedCompressedRequest), found->metadata, protocolId, cpp2ReqCtx, eventBase_, threadManager_.get()); } } else if (std::holds_alternative< PerServiceMetadata::MetadataNotImplemented>( methodMetadataResult)) { // The AsyncProcessorFactory does not implement createMethodMetadata // so we need to fallback to processSerializedCompressedRequest. processor_->processSerializedCompressedRequest( std::move(request), std::move(serializedCompressedRequest), protocolId, cpp2ReqCtx, eventBase_, threadManager_.get()); } else if (std::holds_alternative<PerServiceMetadata::MetadataNotFound>( methodMetadataResult)) { std::string_view methodName = request->getMethodName(); AsyncProcessorHelper::sendUnknownMethodError( std::move(request), methodName); } else { LOG(FATAL) << "Invalid PerServiceMetadata from Cpp2Worker"; } } catch (...) { LOG(DFATAL) << "AsyncProcessor::process exception: " << folly::exceptionStr(std::current_exception()); } } void ThriftRocketServerHandler::handleRequestWithBadMetadata( ThriftRequestCoreUniquePtr request) { if (auto* observer = serverConfigs_->getObserver()) { observer->taskKilled(); } request->sendErrorWrapped( folly::make_exception_wrapper<TApplicationException>( TApplicationException::UNSUPPORTED_CLIENT_TYPE, "Invalid metadata object"), kRequestParsingErrorCode); } void ThriftRocketServerHandler::handleRequestWithBadChecksum( ThriftRequestCoreUniquePtr request) { if (auto* observer = serverConfigs_->getObserver()) { observer->taskKilled(); } request->sendErrorWrapped( folly::make_exception_wrapper<TApplicationException>( TApplicationException::CHECKSUM_MISMATCH, "Checksum mismatch"), kChecksumMismatchErrorCode); } void ThriftRocketServerHandler::handleDecompressionFailure( ThriftRequestCoreUniquePtr request, std::string&& reason) { if (auto* observer = serverConfigs_->getObserver()) { observer->taskKilled(); } request->sendErrorWrapped( folly::make_exception_wrapper<TApplicationException>( TApplicationException::INVALID_TRANSFORM, fmt::format("decompression failure: {}", std::move(reason))), kRequestParsingErrorCode); } void ThriftRocketServerHandler::handleRequestOverloadedServer( ThriftRequestCoreUniquePtr request, const std::string& errorCode) { if (auto* observer = serverConfigs_->getObserver()) { observer->serverOverloaded(); } request->sendErrorWrapped( folly::make_exception_wrapper<TApplicationException>( TApplicationException::LOADSHEDDING, "loadshedding request"), errorCode); } void ThriftRocketServerHandler::handleAppError( ThriftRequestCoreUniquePtr request, const std::string& name, const std::string& message, bool isClientError) { static const std::string headerEx = "uex"; static const std::string headerExWhat = "uexw"; auto header = request->getRequestContext()->getHeader(); header->setHeader(headerEx, name); header->setHeader(headerExWhat, message); request->sendErrorWrapped( folly::make_exception_wrapper<TApplicationException>( TApplicationException::UNKNOWN, std::move(message)), isClientError ? kAppClientErrorCode : kAppServerErrorCode); } void ThriftRocketServerHandler::handleServerNotReady( ThriftRequestCoreUniquePtr request) { if (auto* observer = serverConfigs_->getObserver()) { observer->taskKilled(); } request->sendErrorWrapped( folly::make_exception_wrapper<TApplicationException>( TApplicationException::LOADSHEDDING, "server not ready"), kQueueOverloadedErrorCode); } void ThriftRocketServerHandler::handleServerShutdown( ThriftRequestCoreUniquePtr request) { if (auto* observer = serverConfigs_->getObserver()) { observer->taskKilled(); } request->sendErrorWrapped( folly::make_exception_wrapper<TApplicationException>( TApplicationException::LOADSHEDDING, "server shutting down"), kQueueOverloadedErrorCode); } void ThriftRocketServerHandler::handleInjectedFault( ThriftRequestCoreUniquePtr request, InjectedFault fault) { switch (fault) { case InjectedFault::ERROR: if (auto* observer = serverConfigs_->getObserver()) { observer->taskKilled(); } request->sendErrorWrapped( folly::make_exception_wrapper<TApplicationException>( TApplicationException::INJECTED_FAILURE, "injected failure"), kInjectedFailureErrorCode); return; case InjectedFault::DROP: VLOG(1) << "ERROR: injected drop: " << connContext_.getPeerAddress()->getAddressStr(); return; case InjectedFault::DISCONNECT: return request->closeConnection( folly::make_exception_wrapper<TApplicationException>( TApplicationException::INJECTED_FAILURE, "injected failure")); return; } } void ThriftRocketServerHandler::requestComplete() { serverConfigs_->decActiveRequests(); } void ThriftRocketServerHandler::terminateInteraction(int64_t id) { if (processor_) { processor_->terminateInteraction(id, connContext_, *eventBase_); } } void ThriftRocketServerHandler::onBeforeHandleFrame() { worker_->getServer()->touchRequestTimestamp(); } } // namespace rocket } // namespace thrift } // namespace apache