void ThriftRocketServerHandler::handleRequestCommon()

in thrift/lib/cpp2/transport/rocket/server/ThriftRocketServerHandler.cpp [368:721]


void ThriftRocketServerHandler::handleRequestCommon(
    Payload&& payload, F&& makeRequest, RpcKind expectedKind) {
  // setup request sampling for counters and stats
  auto samplingStatus = shouldSample();
  std::chrono::steady_clock::time_point readEnd;
  if (UNLIKELY(samplingStatus.isEnabled())) {
    readEnd = std::chrono::steady_clock::now();
  }

  rocket::Payload debugPayload = payload.clone();
  auto requestPayloadTry =
      unpackAsCompressed<RequestPayload>(std::move(payload));

  auto makeActiveRequest = [&](auto&& md, auto&& payload, auto&& reqCtx) {
    serverConfigs_->incActiveRequests();
    return makeRequest(std::move(md), std::move(payload), std::move(reqCtx));
  };

  auto createDefaultRequestContext = [&] {
    return std::make_shared<folly::RequestContext>(
        requestsRegistry_->genRootId());
  };

  if (requestPayloadTry.hasException()) {
    handleDecompressionFailure(
        makeActiveRequest(
            RequestRpcMetadata(),
            rocket::Payload{},
            createDefaultRequestContext()),
        requestPayloadTry.exception().what().toStdString());
    return;
  }

  auto& data = requestPayloadTry->payload;
  auto& metadata = requestPayloadTry->metadata;

  if (!isMetadataValid(metadata, expectedKind)) {
    handleRequestWithBadMetadata(makeActiveRequest(
        std::move(metadata),
        std::move(debugPayload),
        createDefaultRequestContext()));
    return;
  }

  if (worker_->isStopping()) {
    handleServerShutdown(makeActiveRequest(
        std::move(metadata),
        std::move(debugPayload),
        createDefaultRequestContext()));
    return;
  }

  THRIFT_APPLICATION_EVENT(server_read_headers).log([&] {
    auto size =
        metadata.otherMetadata_ref() ? metadata.otherMetadata_ref()->size() : 0;
    std::vector<folly::dynamic> keys;
    if (size) {
      keys.reserve(size);
      for (auto& [k, v] : *metadata.otherMetadata_ref()) {
        keys.push_back(k);
      }
    }
    int fmd_sz = 0;
    if (auto fmd = metadata.frameworkMetadata_ref()) {
      DCHECK(*fmd) << "initialized IOBuf is null";
      fmd_sz = (**fmd).computeChainDataLength();
    }
    return folly::dynamic::object("size", size) //
        ("keys", folly::dynamic::array(std::move(keys))) //
        ("frameworkMetadataSize", fmd_sz);
  });

  if (metadata.crc32c_ref()) {
    try {
      if (auto compression = metadata.compression_ref()) {
        data = uncompressBuffer(std::move(data), *compression);
      }
    } catch (...) {
      handleDecompressionFailure(
          makeActiveRequest(
              std::move(metadata),
              rocket::Payload{},
              createDefaultRequestContext()),
          folly::exceptionStr(std::current_exception()).toStdString());
      return;
    }
  }

  // check the checksum
  const bool badChecksum = metadata.crc32c_ref() &&
      (*metadata.crc32c_ref() != checksum::crc32c(*data));

  if (badChecksum) {
    handleRequestWithBadChecksum(makeActiveRequest(
        std::move(metadata),
        std::move(debugPayload),
        createDefaultRequestContext()));
    return;
  }

  if (auto injectedFailure = worker_->getServer()->maybeInjectFailure();
      injectedFailure != ThriftServer::InjectedFailure::NONE) {
    InjectedFault injectedFault;
    switch (injectedFailure) {
      case ThriftServer::InjectedFailure::NONE:
        folly::assume_unreachable();
      case ThriftServer::InjectedFailure::ERROR:
        injectedFault = InjectedFault::ERROR;
        break;
      case ThriftServer::InjectedFailure::DROP:
        injectedFault = InjectedFault::DROP;
        break;
      case ThriftServer::InjectedFailure::DISCONNECT:
        injectedFault = InjectedFault::DISCONNECT;
        break;
    }
    handleInjectedFault(
        makeActiveRequest(
            std::move(metadata),
            std::move(debugPayload),
            createDefaultRequestContext()),
        injectedFault);
    return;
  }

  using PerServiceMetadata = Cpp2Worker::PerServiceMetadata;
  const PerServiceMetadata::FindMethodResult methodMetadataResult =
      serviceMetadata_->findMethod(
          metadata.name_ref()
              ? metadata.name_ref()->view()
              : std::string_view{}); // need to call with empty string_view
                                     // because we still distinguish
                                     // between NotImplemented and
                                     // MetadataNotFound

  auto rootid = requestsRegistry_->genRootId();
  auto baseReqCtx =
      serviceMetadata_->getBaseContextForRequest(methodMetadataResult);
  auto reqCtx = baseReqCtx
      ? folly::RequestContext::copyAsRoot(*baseReqCtx, rootid)
      : std::make_shared<folly::RequestContext>(rootid);
  folly::RequestContextScopeGuard rctx(reqCtx);

  // A request should not be active until the overload checking is done.
  auto request = makeRequest(
      std::move(metadata), std::move(debugPayload), std::move(reqCtx));

  // check if server is overloaded
  const auto& headers = request->getTHeader().getHeaders();
  const auto& name = request->getMethodName();

  bool useResourcePools =
      useResourcePoolsFlagsSet() && !serverConfigs_->resourcePoolSet().empty();

  if (useResourcePools) {
    serverConfigs_->incActiveRequests();

    if (!serverConfigs_->shouldHandleRequestForMethod(name)) {
      handleServerNotReady(std::move(request));
      return;
    }
  } else {
    auto errorCode = serverConfigs_->checkOverload(&headers, &name);
    serverConfigs_->incActiveRequests();
    if (UNLIKELY(errorCode.has_value())) {
      handleRequestOverloadedServer(std::move(request), errorCode.value());
      return;
    }

    if (!serverConfigs_->shouldHandleRequestForMethod(name)) {
      handleServerNotReady(std::move(request));
      return;
    }

    auto preprocessResult = serverConfigs_->preprocess(
        {headers, name, connContext_, request.get()});
    if (UNLIKELY(!std::holds_alternative<std::monostate>(preprocessResult))) {
      folly::variant_match(
          preprocessResult,
          [&](AppClientException& ace) {
            handleAppError(
                std::move(request), ace.name(), ace.getMessage(), true);
          },
          [&](AppOverloadedException&) {
            handleRequestOverloadedServer(
                std::move(request), kAppOverloadedErrorCode);
          },
          [&](const AppServerException& ase) {
            handleAppError(
                std::move(request), ase.name(), ase.getMessage(), false);
          },
          [](std::monostate&) { folly::assume_unreachable(); });

      return;
    }
  }

  logSetupConnectionEventsOnce(setupLoggingFlag_, connContext_);

  auto* cpp2ReqCtx = request->getRequestContext();
  auto& timestamps = cpp2ReqCtx->getTimestamps();
  timestamps.setStatus(samplingStatus);
  if (UNLIKELY(samplingStatus.isEnabled())) {
    timestamps.readEnd = readEnd;
    timestamps.processBegin = std::chrono::steady_clock::now();
  }

  // Log monitoring methods that are called over non-monitoring interface so
  // that they can be migrated.
  LoggingSampler monitoringLogSampler{
      THRIFT_FLAG(monitoring_over_user_logging_sample_rate)};
  if (UNLIKELY(monitoringLogSampler.isSampled())) {
    if (LIKELY(connContext_.getInterfaceKind() != InterfaceKind::MONITORING)) {
      auto& methodName = request->getMethodName();
      if (UNLIKELY(isMonitoringMethodName(methodName))) {
        THRIFT_CONNECTION_EVENT(monitoring_over_user)
            .logSampled(connContext_, monitoringLogSampler, [&] {
              return folly::dynamic::object("method_name", methodName);
            });
      }
    }
  }

  if (serverConfigs_) {
    if (auto* observer = serverConfigs_->getObserver()) {
      observer->admittedRequest(&request->getMethodName());
      // Expensive operations; happens only when sampling is enabled
      if (samplingStatus.isEnabledByServer()) {
        if (threadManager_) {
          observer->queuedRequests(threadManager_->pendingUpstreamTaskCount());
        }
        observer->activeRequests(serverConfigs_->getActiveRequests());
      }
    }
  }
  const auto protocolId = request->getProtoId();
  if (auto interactionId = metadata.interactionId_ref()) {
    cpp2ReqCtx->setInteractionId(*interactionId);
  }
  if (auto interactionCreate = metadata.interactionCreate_ref()) {
    cpp2ReqCtx->setInteractionCreate(*interactionCreate);
    DCHECK_EQ(cpp2ReqCtx->getInteractionId(), 0);
    cpp2ReqCtx->setInteractionId(*interactionCreate->interactionId_ref());
  }

  auto serializedCompressedRequest = SerializedCompressedRequest(
      std::move(data),
      metadata.crc32c_ref()
          ? CompressionAlgorithm::NONE
          : metadata.compression_ref().value_or(CompressionAlgorithm::NONE));

  try {
    if (auto* found = std::get_if<PerServiceMetadata::MetadataFound>(
            &methodMetadataResult);
        LIKELY(found != nullptr)) {
      if (useResourcePools) {
        const ServiceRequestInfo* serviceRequestInfo = serviceRequestInfoMap_
            ? folly::get_ptr(
                  serviceRequestInfoMap_->get(), request->getMethodName())
            : nullptr;
        if (!serviceRequestInfo) {
          std::string_view methodName = request->getMethodName();
          AsyncProcessorHelper::sendUnknownMethodError(
              std::move(request), methodName);
          return;
        }

        ServerRequest serverRequest(
            std::move(request),
            std::move(serializedCompressedRequest),
            eventBase_,
            cpp2ReqCtx,
            protocolId,
            folly::RequestContext::saveContext(),
            processor_.get(),
            &found->metadata,
            serviceRequestInfo);

        // Once we remove the old code we'll move validateRpcKind to a helper.
        if (!GeneratedAsyncProcessor::validateRpcKind(
                serverRequest.request(),
                serverRequest.requestInfo()->rpcKind)) {
          return;
        }

        auto poolResult = AsyncProcessorHelper::selectResourcePool(
            serverRequest, found->metadata);
        if (auto* reject = std::get_if<ServerRequestRejection>(&poolResult)) {
          auto errorCode = kAppOverloadedErrorCode;
          if (reject->applicationException().getType() ==
              TApplicationException::UNKNOWN_METHOD) {
            errorCode = kMethodUnknownErrorCode;
          }
          serverRequest.request()->sendErrorWrapped(
              folly::exception_wrapper(
                  folly::in_place, std::move(*reject).applicationException()),
              errorCode);
          return;
        }

        auto resourcePoolHandle =
            std::get_if<std::reference_wrapper<const ResourcePoolHandle>>(
                &poolResult);
        DCHECK(serverConfigs_->resourcePoolSet().hasResourcePool(
            *resourcePoolHandle));
        auto& resourcePool =
            serverConfigs_->resourcePoolSet().resourcePool(*resourcePoolHandle);
        apache::thrift::detail::ServerRequestHelper::setExecutor(
            serverRequest, resourcePool.executor().value_or(nullptr));
        auto result = resourcePool.accept(std::move(serverRequest));
        if (result) {
          auto errorCode = kQueueOverloadedErrorCode;
          serverRequest.request()->sendErrorWrapped(
              folly::exception_wrapper(
                  folly::in_place,
                  std::move(std::move(result).value()).applicationException()),
              errorCode);
          return;
        }
      } else {
        processor_->processSerializedCompressedRequestWithMetadata(
            std::move(request),
            std::move(serializedCompressedRequest),
            found->metadata,
            protocolId,
            cpp2ReqCtx,
            eventBase_,
            threadManager_.get());
      }
    } else if (std::holds_alternative<
                   PerServiceMetadata::MetadataNotImplemented>(
                   methodMetadataResult)) {
      // The AsyncProcessorFactory does not implement createMethodMetadata
      // so we need to fallback to processSerializedCompressedRequest.
      processor_->processSerializedCompressedRequest(
          std::move(request),
          std::move(serializedCompressedRequest),
          protocolId,
          cpp2ReqCtx,
          eventBase_,
          threadManager_.get());
    } else if (std::holds_alternative<PerServiceMetadata::MetadataNotFound>(
                   methodMetadataResult)) {
      std::string_view methodName = request->getMethodName();
      AsyncProcessorHelper::sendUnknownMethodError(
          std::move(request), methodName);
    } else {
      LOG(FATAL) << "Invalid PerServiceMetadata from Cpp2Worker";
    }
  } catch (...) {
    LOG(DFATAL) << "AsyncProcessor::process exception: "
                << folly::exceptionStr(std::current_exception());
  }
}