void ThriftProcessor::onThriftRequest()

in thrift/lib/cpp2/server/ThriftProcessor.cpp [40:186]


void ThriftProcessor::onThriftRequest(
    RequestRpcMetadata&& metadata,
    std::unique_ptr<IOBuf> payload,
    std::shared_ptr<ThriftChannelIf> channel,
    std::unique_ptr<Cpp2ConnContext> connContext) noexcept {
  DCHECK(payload);
  DCHECK(channel);

  auto& processorFactory = server_.getDecoratedProcessorFactory();
  if (processor_ == nullptr) {
    processor_ = processorFactory.getProcessor();
  }

  auto worker = connContext->getWorker();
  worker->getEventBase()->dcheckIsInEventBaseThread();

  bool invalidMetadata =
      !(metadata.protocol_ref() && metadata.name_ref() && metadata.kind_ref());

  bool invalidChecksum = metadata.crc32c_ref() &&
      *metadata.crc32c_ref() != apache::thrift::checksum::crc32c(*payload);

  auto request = std::make_unique<ThriftRequest>(
      server_, channel, std::move(metadata), std::move(connContext));

  auto* evb = channel->getEventBase();
  if (UNLIKELY(invalidMetadata)) {
    LOG(ERROR) << "Invalid metadata object";
    evb->runInEventBaseThread([request = std::move(request)]() {
      request->sendErrorWrapped(
          folly::make_exception_wrapper<TApplicationException>(
              TApplicationException::UNSUPPORTED_CLIENT_TYPE,
              "invalid metadata object"),
          "corrupted metadata");
    });
    return;
  }
  if (UNLIKELY(invalidChecksum)) {
    LOG(ERROR) << "Invalid checksum";
    evb->runInEventBaseThread([request = std::move(request)]() {
      request->sendErrorWrapped(
          folly::make_exception_wrapper<TApplicationException>(
              TApplicationException::CHECKSUM_MISMATCH, "checksum mismatch"),
          "corrupted request");
    });
    return;
  }

  const auto& serviceMetadata = worker->getMetadataForService(processorFactory);
  using PerServiceMetadata = Cpp2Worker::PerServiceMetadata;
  const PerServiceMetadata::FindMethodResult methodMetadataResult =
      serviceMetadata.findMethod(request->getMethodName());

  auto baseReqCtx =
      serviceMetadata.getBaseContextForRequest(methodMetadataResult);
  auto reqCtx = baseReqCtx ? folly::RequestContext::copyAsChild(*baseReqCtx)
                           : std::make_shared<folly::RequestContext>();
  folly::RequestContextScopeGuard rctx(reqCtx);

  auto protoId = request->getProtoId();
  auto reqContext = request->getRequestContext();

  folly::variant_match(
      methodMetadataResult,
      [&](PerServiceMetadata::MetadataNotImplemented) {
        // The AsyncProcessorFactory does not implement createMethodMetadata
        // so we need to fallback to processSerializedCompressedRequest.
        processor_->processSerializedCompressedRequest(
            std::move(request),
            SerializedCompressedRequest(std::move(payload)),
            protoId,
            reqContext,
            evb,
            server_.getThreadManager().get());
      },
      [&](PerServiceMetadata::MetadataNotFound) {
        std::string_view methodName = request->getMethodName();
        AsyncProcessorHelper::sendUnknownMethodError(
            std::move(request), methodName);
      },
      [&](const PerServiceMetadata::MetadataFound& found) {
        if (!server_.resourcePoolSet().empty()) {
          // We need to process this using request pools
          const ServiceRequestInfo* serviceRequestInfo{nullptr};
          if (auto requestInfo = processorFactory.getServiceRequestInfoMap()) {
            serviceRequestInfo =
                &requestInfo->get().at(request->getMethodName());
          }
          ServerRequest serverRequest(
              std::move(request),
              SerializedCompressedRequest(std::move(payload)),
              evb,
              reqContext,
              protoId,
              folly::RequestContext::saveContext(),
              processor_.get(),
              &found.metadata,
              serviceRequestInfo);

          auto poolResult = AsyncProcessorHelper::selectResourcePool(
              serverRequest, found.metadata);
          if (auto* reject = std::get_if<ServerRequestRejection>(&poolResult)) {
            auto errorCode = kAppOverloadedErrorCode;
            if (reject->applicationException().getType() ==
                TApplicationException::UNKNOWN_METHOD) {
              errorCode = kMethodUnknownErrorCode;
            }
            serverRequest.request()->sendErrorWrapped(
                folly::exception_wrapper(
                    folly::in_place, std::move(*reject).applicationException()),
                errorCode);
            return;
          }

          auto resourcePoolHandle =
              std::get_if<std::reference_wrapper<const ResourcePoolHandle>>(
                  &poolResult);
          DCHECK(
              server_.resourcePoolSet().hasResourcePool(*resourcePoolHandle));
          auto& resourcePool =
              server_.resourcePoolSet().resourcePool(*resourcePoolHandle);
          apache::thrift::detail::ServerRequestHelper::setExecutor(
              serverRequest, resourcePool.executor().value_or(nullptr));

          auto result = resourcePool.accept(std::move(serverRequest));
          if (result) {
            auto errorCode = kQueueOverloadedErrorCode;
            serverRequest.request()->sendErrorWrapped(
                folly::exception_wrapper(
                    folly::in_place,
                    std::move(std::move(result).value())
                        .applicationException()),
                errorCode);
            return;
          }
        } else {
          processor_->processSerializedCompressedRequestWithMetadata(
              std::move(request),
              SerializedCompressedRequest(std::move(payload)),
              found.metadata,
              protoId,
              reqContext,
              evb,
              server_.getThreadManager().get());
        }
      });
}