void Cpp2Connection::requestReceived()

in thrift/lib/cpp2/server/Cpp2Connection.cpp [310:720]


void Cpp2Connection::requestReceived(
    unique_ptr<HeaderServerChannel::HeaderRequest>&& hreq) {
  auto& samplingStatus = hreq->getSamplingStatus();
  std::chrono::steady_clock::time_point readEnd;
  if (samplingStatus.isEnabled()) {
    readEnd = std::chrono::steady_clock::now();
  }

  bool useHttpHandler = false;
  // Any POST not for / should go to the status handler
  if (hreq->getHeader()->getClientType() == THRIFT_HTTP_SERVER_TYPE) {
    auto buf = hreq->getBuf();
    // 7 == length of "POST / " - we are matching on the path
    if (buf->length() >= 7 &&
        0 == strncmp(reinterpret_cast<const char*>(buf->data()), "POST", 4) &&
        buf->data()[6] != ' ') {
      useHttpHandler = true;
    }

    // Any GET should use the handler
    if (buf->length() >= 3 &&
        0 == strncmp(reinterpret_cast<const char*>(buf->data()), "GET", 3)) {
      useHttpHandler = true;
    }

    // Any HEAD should use the handler
    if (buf->length() >= 4 &&
        0 == strncmp(reinterpret_cast<const char*>(buf->data()), "HEAD", 4)) {
      useHttpHandler = true;
    }
  }

  if (useHttpHandler && worker_->getServer()->getGetHandler()) {
    worker_->getServer()->getGetHandler()(
        worker_->getEventBase(),
        worker_->getConnectionManager(),
        transport_,
        hreq->extractBuf());

    // Close the channel, since the handler now owns the socket.
    channel_->setCallback(nullptr);
    channel_->setTransport(nullptr);
    stop();
    return;
  }

  if (THRIFT_FLAG(server_header_reject_http) &&
      (hreq->getHeader()->getClientType() == THRIFT_HTTP_SERVER_TYPE ||
       hreq->getHeader()->getClientType() == THRIFT_HTTP_CLIENT_TYPE ||
       hreq->getHeader()->getClientType() == THRIFT_HTTP_GET_CLIENT_TYPE)) {
    disconnect("Rejecting HTTP connection over Header");
    return;
  }
  if (THRIFT_FLAG(server_header_reject_framed) &&
      (hreq->getHeader()->getClientType() == THRIFT_FRAMED_DEPRECATED ||
       hreq->getHeader()->getClientType() == THRIFT_FRAMED_COMPACT)) {
    disconnect("Rejecting framed connection over Header");
    return;
  }
  if (THRIFT_FLAG(server_header_reject_unframed) &&
      (hreq->getHeader()->getClientType() == THRIFT_UNFRAMED_DEPRECATED ||
       hreq->getHeader()->getClientType() ==
           THRIFT_UNFRAMED_COMPACT_DEPRECATED)) {
    disconnect("Rejecting unframed connection over Header");
    return;
  }

  auto protoId = static_cast<apache::thrift::protocol::PROTOCOL_TYPES>(
      hreq->getHeader()->getProtocolId());
  auto msgBegin = apache::thrift::detail::ap::deserializeMessageBegin(
      *hreq->getBuf(), protoId);
  std::string& methodName = msgBegin.methodName;
  const auto& meta = msgBegin.metadata;

  // Transport upgrade: check if client requested transport upgrade from header
  // to rocket. If yes, reply immediately and upgrade the transport after
  // sending the reply.
  if (methodName == "upgradeToRocket") {
    if (THRIFT_FLAG(server_rocket_upgrade_enabled)) {
      ResponsePayload response;
      switch (protoId) {
        case apache::thrift::protocol::T_BINARY_PROTOCOL:
          response = upgradeToRocketReply<apache::thrift::BinaryProtocolWriter>(
              meta.seqId);
          break;
        case apache::thrift::protocol::T_COMPACT_PROTOCOL:
          response =
              upgradeToRocketReply<apache::thrift::CompactProtocolWriter>(
                  meta.seqId);
          break;
        default:
          LOG(DFATAL) << "Unsupported protocol found";
          // if protocol is neither binary or compact, we want to kill the
          // request and abort upgrade
          killRequest(
              std::move(hreq),
              TApplicationException::TApplicationExceptionType::
                  INVALID_PROTOCOL,
              kUnknownErrorCode,
              "invalid protocol used");
          return;
      }

      hreq->sendReply(
          std::move(response),
          new TransportUpgradeSendCallback(
              transport_,
              context_.getPeerAddress(),
              getWorker(),
              this,
              channel_.get()));
      return;
    } else {
      killRequest(
          std::move(hreq),
          TApplicationException::TApplicationExceptionType::UNKNOWN_METHOD,
          kMethodUnknownErrorCode,
          "Rocket upgrade disabled");
      return;
    }
  }

  if (worker_->getServer()->isHeaderDisabled()) {
    disconnect("Rejecting Header connection");
    return;
  }

  using PerServiceMetadata = Cpp2Worker::PerServiceMetadata;
  const PerServiceMetadata::FindMethodResult methodMetadataResult =
      serviceMetadata_.findMethod(methodName);

  auto baseReqCtx =
      serviceMetadata_.getBaseContextForRequest(methodMetadataResult);
  auto rootid = worker_->getRequestsRegistry()->genRootId();
  auto reqCtx = baseReqCtx
      ? folly::RequestContext::copyAsRoot(*baseReqCtx, rootid)
      : std::make_shared<folly::RequestContext>(rootid);

  folly::RequestContextScopeGuard rctx(reqCtx);

  auto server = worker_->getServer();
  server->touchRequestTimestamp();

  auto* observer = server->getObserver();
  if (observer) {
    observer->receivedRequest(&methodName);
  }

  auto injectedFailure = server->maybeInjectFailure();
  switch (injectedFailure) {
    case ThriftServer::InjectedFailure::NONE:
      break;
    case ThriftServer::InjectedFailure::ERROR:
      killRequest(
          std::move(hreq),
          TApplicationException::TApplicationExceptionType::INJECTED_FAILURE,
          kInjectedFailureErrorCode,
          "injected failure");
      return;
    case ThriftServer::InjectedFailure::DROP:
      VLOG(1) << "ERROR: injected drop: "
              << context_.getPeerAddress()->getAddressStr();
      return;
    case ThriftServer::InjectedFailure::DISCONNECT:
      disconnect("injected failure");
      return;
  }

  if (server->getGetHeaderHandler()) {
    server->getGetHeaderHandler()(hreq->getHeader(), context_.getPeerAddress());
  }

  if (auto overloadResult = server->checkOverload(
          &hreq->getHeader()->getHeaders(), &methodName)) {
    killRequest(
        std::move(hreq),
        TApplicationException::LOADSHEDDING,
        overloadResult.value(),
        "loadshedding request");
    return;
  }

  if (auto preprocessResult = server->preprocess(
          {hreq->getHeader()->getHeaders(), methodName, context_});
      !std::holds_alternative<std::monostate>(preprocessResult)) {
    folly::variant_match(
        preprocessResult,
        [&](AppClientException& ace) {
          handleAppError(std::move(hreq), ace.name(), ace.getMessage(), true);
        },
        [&](AppOverloadedException& aoe) {
          killRequest(
              std::move(hreq),
              TApplicationException::LOADSHEDDING,
              kAppOverloadedErrorCode,
              aoe.getMessage().c_str());
        },
        [&](AppServerException& ase) {
          handleAppError(std::move(hreq), ase.name(), ase.getMessage(), false);
        },
        [](std::monostate&) { folly::assume_unreachable(); });
    return;
  }

  if (worker_->isStopping()) {
    killRequest(
        std::move(hreq),
        TApplicationException::TApplicationExceptionType::INTERNAL_ERROR,
        kQueueOverloadedErrorCode,
        "server shutting down");
    return;
  }

  if (!server->shouldHandleRequestForMethod(methodName)) {
    killRequest(
        std::move(hreq),
        TApplicationException::TApplicationExceptionType::INTERNAL_ERROR,
        kQueueOverloadedErrorCode,
        "server not ready");
    return;
  }

  // After this, the request buffer is no longer owned by the request
  // and will be released after deserializeRequest.
  auto serializedRequest = [&] {
    folly::IOBufQueue bufQueue;
    bufQueue.append(hreq->extractBuf());
    bufQueue.trimStart(meta.size);
    return SerializedRequest(bufQueue.move());
  }();

  // We keep a clone of the request payload buffer for debugging purposes, but
  // the lifetime of payload should not necessarily be the same as its request
  // object's.
  auto debugPayload =
      rocket::Payload::makeCombined(serializedRequest.buffer->clone(), 0);

  std::chrono::milliseconds queueTimeout;
  std::chrono::milliseconds taskTimeout;
  std::chrono::milliseconds clientQueueTimeout =
      hreq->getHeader()->getClientQueueTimeout();
  std::chrono::milliseconds clientTimeout =
      hreq->getHeader()->getClientTimeout();
  auto differentTimeouts = server->getTaskExpireTimeForRequest(
      clientQueueTimeout, clientTimeout, queueTimeout, taskTimeout);
  folly::call_once(clientInfoFlag_, [&] {
    if (const auto& m = hreq->getHeader()->extractClientMetadata()) {
      context_.setClientMetadata(*m);
    }
  });

  context_.setClientType(hreq->getHeader()->getClientType());

  auto t2r = RequestsRegistry::makeRequest<Cpp2Request>(
      std::move(hreq),
      std::move(reqCtx),
      this_,
      std::move(debugPayload),
      std::move(methodName));

  server->incActiveRequests();
  if (samplingStatus.isEnabled()) {
    // Expensive operations; happens only when sampling is enabled
    auto& timestamps = t2r->getTimestamps();
    timestamps.setStatus(samplingStatus);
    timestamps.readEnd = readEnd;
    timestamps.processBegin = std::chrono::steady_clock::now();
    if (samplingStatus.isEnabledByServer() && observer) {
      if (threadManager_) {
        observer->queuedRequests(threadManager_->pendingUpstreamTaskCount());
      }
      observer->activeRequests(server->getActiveRequests());
    }
  }

  activeRequests_.insert(t2r.get());

  auto reqContext = t2r->getContext();
  if (observer) {
    observer->admittedRequest(&reqContext->getMethodName());
  }

  if (differentTimeouts) {
    if (queueTimeout > std::chrono::milliseconds(0)) {
      scheduleTimeout(&t2r->queueTimeout_, queueTimeout);
    }
  }
  if (taskTimeout > std::chrono::milliseconds(0)) {
    scheduleTimeout(&t2r->taskTimeout_, taskTimeout);
  }

  if (clientTimeout > std::chrono::milliseconds::zero()) {
    reqContext->setRequestTimeout(clientTimeout);
  } else {
    reqContext->setRequestTimeout(taskTimeout);
  }

  // Log monitoring methods that are called over header interface so that they
  // can be migrated to rocket monitoring interface.
  LoggingSampler monitoringLogSampler{
      THRIFT_FLAG(monitoring_over_header_logging_sample_rate)};
  if (monitoringLogSampler.isSampled()) {
    if (isMonitoringMethodName(reqContext->getMethodName())) {
      THRIFT_CONNECTION_EVENT(monitoring_over_header)
          .logSampled(context_, monitoringLogSampler, [&] {
            return folly::dynamic::object(
                "method_name", reqContext->getMethodName());
          });
    }
  }

  try {
    ResponseChannelRequest::UniquePtr req = std::move(t2r);
    if (!apache::thrift::detail::ap::setupRequestContextWithMessageBegin(
            meta, protoId, req, reqContext, worker_->getEventBase())) {
      return;
    }

    folly::variant_match(
        methodMetadataResult,
        [&](PerServiceMetadata::MetadataNotImplemented) {
          logSetupConnectionEventsOnce(setupLoggingFlag_, context_);

          // The AsyncProcessorFactory does not implement createMethodMetadata
          // so we need to fallback to processSerializedCompressedRequest.
          processor_->processSerializedCompressedRequest(
              std::move(req),
              SerializedCompressedRequest(std::move(serializedRequest)),
              protoId,
              reqContext,
              worker_->getEventBase(),
              threadManager_.get());
        },
        [&](PerServiceMetadata::MetadataNotFound) {
          AsyncProcessorHelper::sendUnknownMethodError(
              std::move(req), reqContext->getMethodName());
        },
        [&](const PerServiceMetadata::MetadataFound& found) {
          logSetupConnectionEventsOnce(setupLoggingFlag_, context_);
          if (!server->resourcePoolSet().empty()) {
            // We need to process this using request pools
            const ServiceRequestInfo* serviceRequestInfo{nullptr};
            if (serviceRequestInfoMap_) {
              serviceRequestInfo = &serviceRequestInfoMap_->get().at(
                  reqContext->getMethodName());
            }
            ServerRequest serverRequest(
                std::move(req),
                SerializedCompressedRequest(std::move(serializedRequest)),
                worker_->getEventBase(),
                reqContext,
                protoId,
                folly::RequestContext::saveContext(),
                processor_.get(),
                &found.metadata,
                serviceRequestInfo);

            auto poolResult = AsyncProcessorHelper::selectResourcePool(
                serverRequest, found.metadata);
            if (auto* reject =
                    std::get_if<ServerRequestRejection>(&poolResult)) {
              auto errorCode = kAppOverloadedErrorCode;
              if (reject->applicationException().getType() ==
                  TApplicationException::UNKNOWN_METHOD) {
                errorCode = kMethodUnknownErrorCode;
              }
              serverRequest.request()->sendErrorWrapped(
                  folly::exception_wrapper(
                      folly::in_place,
                      std::move(*reject).applicationException()),
                  errorCode);
              return;
            }

            auto resourcePoolHandle =
                std::get_if<std::reference_wrapper<const ResourcePoolHandle>>(
                    &poolResult);
            DCHECK(
                server->resourcePoolSet().hasResourcePool(*resourcePoolHandle));
            auto& resourcePool =
                server->resourcePoolSet().resourcePool(*resourcePoolHandle);
            apache::thrift::detail::ServerRequestHelper::setExecutor(
                serverRequest, resourcePool.executor().value_or(nullptr));

            auto result = resourcePool.accept(std::move(serverRequest));
            if (result) {
              auto errorCode = kQueueOverloadedErrorCode;
              serverRequest.request()->sendErrorWrapped(
                  folly::exception_wrapper(
                      folly::in_place,
                      std::move(std::move(result).value())
                          .applicationException()),
                  errorCode);
              return;
            }
          } else {
            processor_->processSerializedCompressedRequestWithMetadata(
                std::move(req),
                SerializedCompressedRequest(std::move(serializedRequest)),
                found.metadata,
                protoId,
                reqContext,
                worker_->getEventBase(),
                threadManager_.get());
          }
        });
  } catch (...) {
    LOG(DFATAL) << "AsyncProcessor::process exception: "
                << folly::exceptionStr(std::current_exception());
  }
}