void ThriftRocketServerHandler::handleSetupFrame()

in thrift/lib/cpp2/transport/rocket/server/ThriftRocketServerHandler.cpp [119:252]


void ThriftRocketServerHandler::handleSetupFrame(
    SetupFrame&& frame, RocketServerConnection& connection) {
  if (!frame.payload().hasNonemptyMetadata()) {
    return connection.close(folly::make_exception_wrapper<RocketException>(
        ErrorCode::INVALID_SETUP, "Missing required metadata in SETUP frame"));
  }

  folly::io::Cursor cursor(frame.payload().buffer());

  // Validate Thrift protocol key
  uint32_t protocolKey;
  const bool success = cursor.tryReadBE<uint32_t>(protocolKey);
  constexpr uint32_t kLegacyRocketProtocolKey = 1;
  if (!success ||
      ((!THRIFT_FLAG(rocket_server_legacy_protocol_key) ||
        protocolKey != kLegacyRocketProtocolKey) &&
       protocolKey != RpcMetadata_constants::kRocketProtocolKey())) {
    if (!frame.rocketMimeTypes()) {
      return connection.close(folly::make_exception_wrapper<RocketException>(
          ErrorCode::INVALID_SETUP, "Incompatible Thrift version"));
    }
    // If Rocket MIME types are used the protocol key is optional.
    if (success) {
      cursor.retreat(4);
    }
  }

  try {
    CompactProtocolReader reader;
    reader.setInput(cursor);
    RequestSetupMetadata meta;
    // Throws on read error
    meta.read(&reader);
    if (reader.getCursorPosition() > frame.payload().metadataSize()) {
      return connection.close(folly::make_exception_wrapper<RocketException>(
          ErrorCode::INVALID_SETUP,
          "Error deserializing SETUP payload: underflow"));
    }
    connContext_.readSetupMetadata(meta);

    auto minVersion = meta.minVersion_ref().value_or(0);
    auto maxVersion = meta.maxVersion_ref().value_or(0);

    THRIFT_CONNECTION_EVENT(rocket.setup).log(connContext_, [&] {
      return folly::dynamic::object("client_min_version", minVersion)(
          "client_max_version", maxVersion)("server_version", version_);
    });

    if (minVersion > version_) {
      return connection.close(folly::make_exception_wrapper<RocketException>(
          ErrorCode::INVALID_SETUP, "Incompatible Rocket version"));
    }

    if (maxVersion < kRocketServerMinVersion) {
      return connection.close(folly::make_exception_wrapper<RocketException>(
          ErrorCode::INVALID_SETUP, "Incompatible Rocket version"));
    }
    version_ = std::min(version_, maxVersion);

    if (version_ >= 9 && !frame.rocketMimeTypes()) {
      return connection.close(folly::make_exception_wrapper<RocketException>(
          ErrorCode::INVALID_SETUP, "Unsupported MIME types"));
    }

    eventBase_ = connContext_.getTransport()->getEventBase();
    for (const auto& h : setupFrameHandlers_) {
      auto processorInfo = h->tryHandle(meta);
      if (processorInfo) {
        bool valid = true;
        processorFactory_ = std::addressof(processorInfo->processorFactory_);
        serviceMetadata_ =
            std::addressof(worker_->getMetadataForService(*processorFactory_));
        serviceRequestInfoMap_ = processorFactory_->getServiceRequestInfoMap();
        valid &= !!(processor_ = processorFactory_->getProcessor());
        // Allow no thread manager if resource pools in use
        valid &=
            (!!(threadManager_ = std::move(processorInfo->threadManager_)) ||
             useResourcePoolsFlagsSet());
        valid &= !!(serverConfigs_ = &processorInfo->serverConfigs_);
        requestsRegistry_ = processorInfo->requestsRegistry_ != nullptr
            ? processorInfo->requestsRegistry_
            : worker_->getRequestsRegistry();
        if (!valid) {
          return connection.close(
              folly::make_exception_wrapper<RocketException>(
                  ErrorCode::INVALID_SETUP,
                  "Error in implementation of custom connection handler."));
        }
        return;
      }
    }
    // no custom frame handler was found, do the default
    processorFactory_ =
        std::addressof(worker_->getServer()->getDecoratedProcessorFactory());
    serviceMetadata_ =
        std::addressof(worker_->getMetadataForService(*processorFactory_));
    serviceRequestInfoMap_ = processorFactory_->getServiceRequestInfoMap();
    processor_ = processorFactory_->getProcessor();
    if (!useResourcePoolsFlagsSet()) {
      threadManager_ = worker_->getServer()->getThreadManager();
    }
    serverConfigs_ = worker_->getServer();
    requestsRegistry_ = worker_->getRequestsRegistry();

    // add sampleRate
    if (serverConfigs_) {
      if (auto* observer = serverConfigs_->getObserver()) {
        sampleRate_ = observer->getSampleRate();
      }
    }

    if (meta.dscpToReflect_ref() || meta.markToReflect_ref()) {
      connection.applyDscpAndMarkToSocket(meta);
    }

    DCHECK_GE(version_, 5);
    ServerPushMetadata serverMeta;
    serverMeta.set_setupResponse();
    serverMeta.setupResponse_ref()->version_ref() = version_;
    serverMeta.setupResponse_ref()->zstdSupported_ref() = true;
    CompactProtocolWriter compactProtocolWriter;
    folly::IOBufQueue queue;
    compactProtocolWriter.setOutput(&queue);
    serverMeta.write(&compactProtocolWriter);
    connection.sendMetadataPush(std::move(queue).move());

  } catch (const std::exception& e) {
    return connection.close(folly::make_exception_wrapper<RocketException>(
        ErrorCode::INVALID_SETUP,
        fmt::format(
            "Error deserializing SETUP payload: {}",
            folly::exceptionStr(e).toStdString())));
  }
}