func()

in router/core/graph_server.go [767:1456]


func (s *graphServer) buildGraphMux(
	ctx context.Context,
	opts BuildGraphMuxOptions,
) (*graphMux, error) {
	gm := &graphMux{
		metricStore: rmetric.NewNoopMetrics(),
	}

	httpRouter := chi.NewRouter()

	// we only enable the attribute mapper if we are not using the default cloud exporter
	baseMuxAttributes := append([]attribute.KeyValue{otel.WgRouterConfigVersion.String(opts.RouterConfigVersion)}, s.baseOtelAttributes...)

	if !opts.IsBaseGraph() {
		baseMuxAttributes = append(baseMuxAttributes, otel.WgFeatureFlag.String(opts.FeatureFlagName))
	}

	metricsEnabled := s.metricConfig.IsEnabled()

	exprManager := expr.CreateNewExprManager()

	// We might want to remap or exclude known attributes based on the configuration for metrics
	mapper := newAttributeMapper(!rmetric.IsUsingDefaultCloudExporter(s.metricConfig), s.metricConfig.Attributes)
	baseMetricAttributes := mapper.mapAttributes(baseMuxAttributes)

	metricAttExpressions, attErr := newAttributeExpressions(s.metricConfig.Attributes, exprManager)
	if attErr != nil {
		return nil, attErr
	}
	var telemetryAttExpressions *attributeExpressions
	if len(s.telemetryAttributes) > 0 {
		var telemetryAttErr error
		telemetryAttExpressions, telemetryAttErr = newAttributeExpressions(s.telemetryAttributes, exprManager)
		if telemetryAttErr != nil {
			return nil, telemetryAttErr
		}
	}

	var tracingAttExpressions *attributeExpressions
	if len(s.tracingAttributes) > 0 {
		var tracingAttrErr error
		tracingAttExpressions, tracingAttrErr = newAttributeExpressions(s.tracingAttributes, exprManager)
		if tracingAttrErr != nil {
			return nil, tracingAttrErr
		}
	}

	// Prometheus metricStore rely on OTLP metricStore
	if metricsEnabled {
		attrKeyValues := []attribute.KeyValue{
			otel.WgRouterConfigVersion.String(opts.RouterConfigVersion),
			otel.WgRouterVersion.String(Version),
		}
		if !opts.IsBaseGraph() {
			attrKeyValues = append(attrKeyValues, otel.WgFeatureFlag.String(opts.FeatureFlagName))
		}
		routerInfoBaseAttrs := otelmetric.WithAttributeSet(attribute.NewSet(attrKeyValues...))

		// From a users perspective this is similar to engine metrics, etc
		// but in this case we use the same metric store
		otlpOpts := rmetric.MetricOpts{
			EnableCircuitBreaker: s.metricConfig.OpenTelemetry.Enabled,
		}
		promOpts := rmetric.MetricOpts{
			EnableCircuitBreaker: s.metricConfig.Prometheus.Enabled,
		}

		m, err := rmetric.NewStore(otlpOpts, promOpts,
			rmetric.WithPromMeterProvider(s.promMeterProvider),
			rmetric.WithOtlpMeterProvider(s.otlpMeterProvider),
			rmetric.WithBaseAttributes(baseMetricAttributes),
			rmetric.WithLogger(s.logger),
			rmetric.WithProcessStartTime(s.processStartTime),
			rmetric.WithCardinalityLimit(s.metricConfig.CardinalityLimit),
			rmetric.WithRouterInfoAttributes(routerInfoBaseAttrs),
		)
		if err != nil {
			return nil, fmt.Errorf("failed to create metric handler: %w", err)
		}

		gm.metricStore = m
	}

	// We initialize circuit breakers for all subgraphs in the base configuration (non-ff)
	// so we don't duplicate circuit breakers for subgraphs and they can be used in the feature flags even
	// We initialize it in the buildGraphMux because we want to use the base metric configuration
	if opts.IsBaseGraph() && s.subgraphCircuitBreakerOptions.IsEnabled() {
		// If either otel or prom metrics are enabled for circuit breakers
		// we will enable circuit breaker metric collections
		isCircuitBreakerMetricsEnabled := s.metricConfig.OpenTelemetry.CircuitBreaker || s.metricConfig.Prometheus.CircuitBreaker

		err := s.circuitBreakerManager.Initialize(circuit.ManagerOpts{
			SubgraphCircuitBreakers: s.subgraphCircuitBreakerOptions.SubgraphMap,
			MetricStore:             gm.metricStore,
			UseMetrics:              metricsEnabled && isCircuitBreakerMetricsEnabled,
			BaseOtelAttributes:      baseMetricAttributes,
			AllGroupings:            opts.RoutingUrlGroupings,
		})
		if err != nil {
			return nil, err
		}
	}

	subgraphs, err := configureSubgraphOverwrites(
		opts.EngineConfig,
		opts.ConfigSubgraphs,
		s.overrideRoutingURLConfiguration,
		s.overrides,
		false,
	)
	if err != nil {
		return nil, err
	}

	computeSha256, err := gm.buildOperationCaches(s)
	if err != nil {
		return nil, err
	}

	if err = gm.configureCacheMetrics(s, baseMetricAttributes); err != nil {
		return nil, err
	}

	metrics := NewRouterMetrics(&routerMetricsConfig{
		metrics:             gm.metricStore,
		gqlMetricsExporter:  s.gqlMetricsExporter,
		exportEnabled:       s.graphqlMetricsConfig.Enabled,
		routerConfigVersion: opts.RouterConfigVersion,
		logger:              s.logger,

		promSchemaUsageEnabled:             s.metricConfig.Prometheus.PromSchemaFieldUsage.Enabled,
		promSchemaUsageIncludeOperationSha: s.metricConfig.Prometheus.PromSchemaFieldUsage.IncludeOperationSha,
	})

	baseLogFields := []zapcore.Field{
		zap.String("config_version", opts.RouterConfigVersion),
	}

	if !opts.IsBaseGraph() {
		baseLogFields = append(baseLogFields, zap.String("feature_flag", opts.FeatureFlagName))
	}

	// Currently, we only support custom attributes from the context for OTLP metrics
	b := buildAttributesMap(s.metricConfig.Attributes)

	// Enrich the request context with the subgraph information which is required for custom modules and tracing
	subgraphResolver := NewSubgraphResolver(subgraphs)
	httpRouter.Use(func(h http.Handler) http.Handler {
		return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
			r = r.WithContext(withSubgraphResolver(r.Context(), subgraphResolver))

			requestLogger := s.logger.With(logging.WithRequestID(middleware.GetReqID(r.Context())))
			// If this is a batched request attach id to the logger
			if batchedOperationId, ok := r.Context().Value(BatchedOperationId{}).(string); ok {
				requestLogger = requestLogger.With(logging.WithBatchedRequestOperationID(batchedOperationId))
			}
			reqContext := buildRequestContext(requestContextOptions{
				operationContext:              nil,
				requestLogger:                 requestLogger,
				metricSetAttributes:           b,
				metricsEnabled:                metricsEnabled,
				traceEnabled:                  s.traceConfig.Enabled,
				mapper:                        mapper,
				metricAttributeExpressions:    metricAttExpressions,
				telemetryAttributeExpressions: telemetryAttExpressions,
				tracingAttributeExpressions:   tracingAttExpressions,
				w:                             w,
				r:                             r,
			})

			r = r.WithContext(withRequestContext(r.Context(), reqContext))

			// For debugging purposes, we can validate from what version of the config the request is coming from
			if s.setConfigVersionHeader {
				w.Header().Set("X-Router-Config-Version", opts.RouterConfigVersion)
			}

			h.ServeHTTP(w, r)
		})
	})

	var recoverOpts []recoveryhandler.Option

	// If we have no access logger configured, we log the panic in the recovery handler to avoid losing the panic information
	if s.accessLogsConfig == nil {
		recoverOpts = append(recoverOpts, recoveryhandler.WithLogHandler(func(w http.ResponseWriter, r *http.Request, err any) {
			reqContext := getRequestContext(r.Context())
			if reqContext != nil {
				reqContext.logger.Error("[Recovery from panic]",
					zap.Any("error", err),
				)
			}
		}))
	}

	recoveryHandler := recoveryhandler.New(recoverOpts...)

	httpRouter.Use(recoveryHandler)

	// Setup any router on request middlewares so that they can be used to manipulate
	// other downstream internal middlewares such as tracing or authentication
	httpRouter.Use(s.routerOnRequestHandlers...)

	/**
	* Initialize base attributes from headers and other sources
	 */

	var commonAttrRequestMapper func(r *http.Request) []attribute.KeyValue
	if len(s.telemetryAttributes) > 0 {
		// Common attributes across traces and metrics
		commonAttrRequestMapper = buildHeaderAttributesMapper(s.telemetryAttributes)
	}

	var tracingAttrRequestMapper func(r *http.Request) []attribute.KeyValue
	if len(s.tracingAttributes) > 0 {
		tracingAttrRequestMapper = buildHeaderAttributesMapper(s.tracingAttributes)
	}

	var metricAttrRequestMapper func(r *http.Request) []attribute.KeyValue
	if s.metricConfig.IsEnabled() {
		// Metric attributes are only used for OTLP metrics and Prometheus metrics
		metricAttrRequestMapper = buildHeaderAttributesMapper(s.metricConfig.Attributes)
	}

	httpRouter.Use(func(h http.Handler) http.Handler {
		return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
			reqContext := getRequestContext(r.Context())

			reqContext.telemetry.addCommonTraceAttribute(baseMuxAttributes...)

			if commonAttrRequestMapper != nil {
				reqContext.telemetry.addCommonAttribute(commonAttrRequestMapper(r)...)
			}
			if tracingAttrRequestMapper != nil {
				reqContext.telemetry.addCommonTraceAttribute(tracingAttrRequestMapper(r)...)
			}
			if metricAttrRequestMapper != nil {
				reqContext.telemetry.addMetricAttribute(metricAttrRequestMapper(r)...)
			}

			h.ServeHTTP(w, r)
		})
	})

	if s.traceConfig.Enabled {
		f := func(h http.Handler) http.Handler {
			return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
				reqContext := getRequestContext(r.Context())
				traceID := rtrace.GetTraceID(r.Context())
				requestLogger := reqContext.Logger().With(logging.WithTraceID(traceID))

				reqContext.logger = requestLogger

				span := oteltrace.SpanFromContext(r.Context())
				span.SetAttributes(reqContext.telemetry.traceAttrs...)

				// Set if the trace is sampled in the expression context
				isSampled := span.SpanContext().IsSampled()
				reqContext.expressionContext.Request.Trace.Sampled = isSampled

				// Set the trace ID in the response header
				if s.traceConfig.ResponseTraceHeader.Enabled {
					w.Header().Set(s.traceConfig.ResponseTraceHeader.HeaderName, traceID)
				}

				h.ServeHTTP(w, r)
			})
		}
		httpRouter.Use(f)
	}

	var subgraphAccessLogger *requestlogger.SubgraphAccessLogger
	if s.accessLogsConfig != nil && s.accessLogsConfig.Logger != nil {
		exprAttributes, err := requestlogger.GetAccessLogConfigExpressions(s.accessLogsConfig.Attributes, exprManager)
		if err != nil {
			return nil, fmt.Errorf("failed building router access log expressions: %w", err)
		}

		accessLogAttributes := requestlogger.CleanupExpressionAttributes(s.accessLogsConfig.Attributes)

		requestLoggerOpts := []requestlogger.Option{
			requestlogger.WithDefaultOptions(),
			requestlogger.WithNoTimeField(),
			requestlogger.WithFields(baseLogFields...),
			requestlogger.WithAttributes(accessLogAttributes),
			requestlogger.WithExprAttributes(exprAttributes),
			requestlogger.WithFieldsHandler(RouterAccessLogsFieldHandler),
		}

		var ipAnonConfig *requestlogger.IPAnonymizationConfig
		if s.ipAnonymization.Enabled {
			ipAnonConfig = &requestlogger.IPAnonymizationConfig{
				Enabled: s.ipAnonymization.Enabled,
				Method:  requestlogger.IPAnonymizationMethod(s.ipAnonymization.Method),
			}
			requestLoggerOpts = append(requestLoggerOpts, requestlogger.WithAnonymization(ipAnonConfig))
		}

		requestLogger := requestlogger.New(
			s.accessLogsConfig.Logger,
			requestLoggerOpts...,
		)
		httpRouter.Use(requestLogger)

		if s.accessLogsConfig.SubgraphEnabled {
			subgraphExprAttributes, err := requestlogger.GetAccessLogConfigExpressions(s.accessLogsConfig.SubgraphAttributes, exprManager)
			if err != nil {
				return nil, fmt.Errorf("failed building router access log expressions: %w", err)
			}

			subgraphAttributes := requestlogger.CleanupExpressionAttributes(s.accessLogsConfig.SubgraphAttributes)

			subgraphAccessLogger = requestlogger.NewSubgraphAccessLogger(
				s.accessLogsConfig.Logger,
				requestlogger.SubgraphOptions{
					IPAnonymizationConfig: ipAnonConfig,
					FieldsHandler:         SubgraphAccessLogsFieldHandler,
					Fields:                baseLogFields,
					Attributes:            subgraphAttributes,
					ExprAttributes:        subgraphExprAttributes,
				})
		}

		if exprManager.VisitorManager.IsResponseBodyUsedInExpressions() {
			httpRouter.Use(func(h http.Handler) http.Handler {
				return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
					buf := bytes.Buffer{}
					ww := middleware.NewWrapResponseWriter(w, r.ProtoMajor)

					ww.Tee(&buf)

					h.ServeHTTP(ww, r)

					reqContext := getRequestContext(r.Context())
					reqContext.expressionContext.Response.Body.Raw = buf.String()
				})
			})
		}
	}

	routerEngineConfig := &RouterEngineConfiguration{
		Execution:                s.engineExecutionConfiguration,
		Headers:                  s.headerRules,
		Events:                   s.eventsConfig,
		SubgraphErrorPropagation: s.subgraphErrorPropagation,
	}

	// map[string]*http.Transport cannot be coerced into map[string]http.RoundTripper, unfortunately
	subgraphTippers := map[string]http.RoundTripper{}
	for subgraph, subgraphTransport := range s.subgraphTransports {
		subgraphTippers[subgraph] = subgraphTransport
	}

	if err := s.setupConnector(ctx, opts.EngineConfig, opts.ConfigSubgraphs); err != nil {
		return nil, fmt.Errorf("failed to setup plugin host: %w", err)
	}

	enableTraceClient := s.connectionMetrics != nil || exprManager.VisitorManager.IsSubgraphTraceUsedInExpressions()

	var baseConnMetricStore rmetric.ConnectionMetricStore = &rmetric.NoopConnectionMetricStore{}
	if s.connectionMetrics != nil {
		baseConnMetricStore = s.connectionMetrics
	}

	ecb := &ExecutorConfigurationBuilder{
		introspection:    s.introspection,
		baseURL:          s.baseURL,
		baseTripper:      s.baseTransport,
		subgraphTrippers: subgraphTippers,
		pluginHost:       s.connector,
		logger:           s.logger,
		trackUsageInfo:   s.graphqlMetricsConfig.Enabled || s.metricConfig.Prometheus.PromSchemaFieldUsage.Enabled,
		subscriptionClientOptions: &SubscriptionClientOptions{
			PingInterval: s.engineExecutionConfiguration.WebSocketClientPingInterval,
			PingTimeout:  s.engineExecutionConfiguration.WebSocketClientPingTimeout,
			ReadTimeout:  s.engineExecutionConfiguration.WebSocketClientReadTimeout,
			FrameTimeout: s.engineExecutionConfiguration.WebSocketClientFrameTimeout,
		},
		transportOptions: &TransportOptions{
			SubgraphTransportOptions: s.subgraphTransportOptions,
			PreHandlers:              s.preOriginHandlers,
			PostHandlers:             s.postOriginHandlers,
			MetricStore:              gm.metricStore,
			ConnectionMetricStore:    baseConnMetricStore,
			RetryOptions: retrytransport.RetryOptions{
				Enabled:       s.retryOptions.Enabled,
				MaxRetryCount: s.retryOptions.MaxRetryCount,
				MaxDuration:   s.retryOptions.MaxDuration,
				Interval:      s.retryOptions.Interval,
				ShouldRetry: func(err error, req *http.Request, resp *http.Response) bool {
					return retrytransport.IsRetryableError(err, resp) && !isMutationRequest(req.Context())
				},
			},
			TracerProvider:                s.tracerProvider,
			TracePropagators:              s.compositePropagator,
			LocalhostFallbackInsideDocker: s.localhostFallbackInsideDocker,
			Logger:                        s.logger,
			EnableTraceClient:             enableTraceClient,
			CircuitBreaker:                s.circuitBreakerManager,
		},
	}

	executor, providers, err := ecb.Build(
		ctx,
		&ExecutorBuildOptions{
			EngineConfig:                   opts.EngineConfig,
			Subgraphs:                      opts.ConfigSubgraphs,
			RouterEngineConfig:             routerEngineConfig,
			Reporter:                       s.engineStats,
			ApolloCompatibilityFlags:       s.apolloCompatibilityFlags,
			ApolloRouterCompatibilityFlags: s.apolloRouterCompatibilityFlags,
			HeartbeatInterval:              s.multipartHeartbeatInterval,
			PluginsEnabled:                 s.plugins.Enabled,
			InstanceData:                   s.instanceData,
		},
	)
	if err != nil {
		return nil, fmt.Errorf("failed to build plan configuration: %w", err)
	}

	s.pubSubProviders = providers
	if pubSubStartupErr := s.startupPubSubProviders(ctx); pubSubStartupErr != nil {
		return nil, pubSubStartupErr
	}

	operationProcessor := NewOperationProcessor(OperationProcessorOptions{
		Executor:                            executor,
		MaxOperationSizeInBytes:             int64(s.routerTrafficConfig.MaxRequestBodyBytes),
		PersistedOperationClient:            s.persistedOperationClient,
		AutomaticPersistedOperationCacheTtl: s.automaticPersistedQueriesConfig.Cache.TTL,
		EnablePersistedOperationsCache:      s.engineExecutionConfiguration.EnablePersistedOperationsCache,
		PersistedOpsNormalizationCache:      gm.persistedOperationCache,
		NormalizationCache:                  gm.normalizationCache,
		ValidationCache:                     gm.validationCache,
		QueryDepthCache:                     gm.complexityCalculationCache,
		OperationHashCache:                  gm.operationHashCache,
		ParseKitPoolSize:                    s.engineExecutionConfiguration.ParseKitPoolSize,
		IntrospectionEnabled:                s.Config.introspection,
		ParserTokenizerLimits: astparser.TokenizerLimits{
			MaxDepth:  s.Config.securityConfiguration.ParserLimits.ApproximateDepthLimit,
			MaxFields: s.Config.securityConfiguration.ParserLimits.TotalFieldsLimit,
		},
		ApolloCompatibilityFlags:                         s.apolloCompatibilityFlags,
		ApolloRouterCompatibilityFlags:                   s.apolloRouterCompatibilityFlags,
		DisableExposingVariablesContentOnValidationError: s.engineExecutionConfiguration.DisableExposingVariablesContentOnValidationError,
		ComplexityLimits:                                 s.securityConfiguration.ComplexityLimits,
	})
	operationPlanner := NewOperationPlanner(executor, gm.planCache)

	// We support the MCP only on the base graph. Feature flags are not supported yet.
	if opts.IsBaseGraph() && s.mcpServer != nil {
		if mErr := s.mcpServer.Reload(executor.ClientSchema); mErr != nil {
			return nil, fmt.Errorf("failed to reload MCP server: %w", mErr)
		}
	}

	if s.Config.cacheWarmup != nil && s.Config.cacheWarmup.Enabled {

		if s.graphApiToken == "" {
			return nil, fmt.Errorf("graph token is required for cache warmup in order to communicate with the CDN")
		}

		processor := NewCacheWarmupPlanningProcessor(&CacheWarmupPlanningProcessorOptions{
			OperationProcessor:        operationProcessor,
			OperationPlanner:          operationPlanner,
			ComplexityLimits:          s.securityConfiguration.ComplexityLimits,
			RouterSchema:              executor.RouterSchema,
			TrackSchemaUsage:          s.graphqlMetricsConfig.Enabled,
			DisableVariablesRemapping: s.engineExecutionConfiguration.DisableVariablesRemapping,
		})

		warmupConfig := &CacheWarmupConfig{
			Log:            s.logger,
			Processor:      processor,
			Workers:        s.Config.cacheWarmup.Workers,
			ItemsPerSecond: s.Config.cacheWarmup.ItemsPerSecond,
			Timeout:        s.Config.cacheWarmup.Timeout,
		}

		warmupConfig.AfterOperation = func(item *CacheWarmupOperationPlanResult) {
			gm.metricStore.MeasureOperationPlanningTime(ctx,
				item.PlanningTime,
				nil,
				otelmetric.WithAttributes(
					append([]attribute.KeyValue{
						otel.WgOperationName.String(item.OperationName),
						otel.WgClientName.String(item.ClientName),
						otel.WgClientVersion.String(item.ClientVersion),
						otel.WgFeatureFlag.String(opts.FeatureFlagName),
						otel.WgOperationHash.String(item.OperationHash),
						otel.WgOperationType.String(item.OperationType),
						otel.WgEnginePlanCacheHit.Bool(false),
					}, baseMetricAttributes...)...,
				),
			)
		}

		if s.Config.cacheWarmup.Source.Filesystem != nil {
			warmupConfig.Source = NewFileSystemSource(&FileSystemSourceConfig{
				RootPath: s.Config.cacheWarmup.Source.Filesystem.Path,
			})
		} else {
			cdnSource, err := NewCDNSource(s.Config.cdnConfig.URL, s.graphApiToken, s.logger)
			if err != nil {
				return nil, fmt.Errorf("failed to create cdn source: %w", err)
			}
			warmupConfig.Source = cdnSource
		}

		err = WarmupCaches(ctx, warmupConfig)
		if err != nil {
			// We don't want to fail the server if the cache warmup fails
			s.logger.Error("Failed to warmup caches. It will retry after server restart or graph execution config update", zap.Error(err))
		}
	}

	authorizerOptions := &CosmoAuthorizerOptions{
		FieldConfigurations:           opts.EngineConfig.FieldConfigurations,
		RejectOperationIfUnauthorized: false,
	}

	if s.Config.authorization != nil {
		authorizerOptions.RejectOperationIfUnauthorized = s.authorization.RejectOperationIfUnauthorized
	}

	handlerOpts := HandlerOptions{
		Executor:                               executor,
		Log:                                    s.logger,
		EnableExecutionPlanCacheResponseHeader: s.engineExecutionConfiguration.EnableExecutionPlanCacheResponseHeader,
		EnablePersistedOperationCacheResponseHeader: s.engineExecutionConfiguration.Debug.EnablePersistedOperationsCacheResponseHeader,
		EnableNormalizationCacheResponseHeader:      s.engineExecutionConfiguration.Debug.EnableNormalizationCacheResponseHeader,
		EnableResponseHeaderPropagation:             s.headerRules != nil,
		EngineStats:                                 s.engineStats,
		TracerProvider:                              s.tracerProvider,
		Authorizer:                                  NewCosmoAuthorizer(authorizerOptions),
		SubgraphErrorPropagation:                    s.subgraphErrorPropagation,
		EngineLoaderHooks: NewEngineRequestHooks(
			gm.metricStore,
			subgraphAccessLogger,
			s.tracerProvider,
			tracingAttExpressions,
			telemetryAttExpressions,
			metricAttExpressions,
			exprManager.VisitorManager.IsSubgraphResponseBodyUsedInExpressions(),
		),
	}

	if s.redisClient != nil {
		handlerOpts.RateLimitConfig = s.rateLimit
		handlerOpts.RateLimiter, err = NewCosmoRateLimiter(&CosmoRateLimiterOptions{
			RedisClient:         s.redisClient,
			Debug:               s.rateLimit.Debug,
			RejectStatusCode:    s.rateLimit.SimpleStrategy.RejectStatusCode,
			KeySuffixExpression: s.rateLimit.KeySuffixExpression,
			ExprManager:         exprManager,
		})
		if err != nil {
			return nil, fmt.Errorf("failed to create rate limiter: %w", err)
		}
	}

	if s.apolloCompatibilityFlags.SubscriptionMultipartPrintBoundary.Enabled {
		handlerOpts.ApolloSubscriptionMultipartPrintBoundary = s.apolloCompatibilityFlags.SubscriptionMultipartPrintBoundary.Enabled
	}

	graphqlHandler := NewGraphQLHandler(handlerOpts)
	executor.Resolver.SetAsyncErrorWriter(graphqlHandler)

	operationBlocker, err := NewOperationBlocker(&OperationBlockerOptions{
		BlockMutations: BlockMutationOptions{
			Enabled:   s.securityConfiguration.BlockMutations.Enabled,
			Condition: s.securityConfiguration.BlockMutations.Condition,
		},
		BlockSubscriptions: BlockSubscriptionOptions{
			Enabled:   s.securityConfiguration.BlockSubscriptions.Enabled,
			Condition: s.securityConfiguration.BlockSubscriptions.Condition,
		},
		BlockNonPersisted: BlockNonPersistedOptions{
			Enabled:   s.securityConfiguration.BlockNonPersistedOperations.Enabled,
			Condition: s.securityConfiguration.BlockNonPersistedOperations.Condition,
		},
		PersistedOperationsDisabled: s.persistedOperationsConfig.Disabled,
		SafelistEnabled:             s.persistedOperationsConfig.Safelist.Enabled,
		LogUnknownOperationsEnabled: s.persistedOperationsConfig.LogUnknown,
		exprManager:                 exprManager,
	})
	if err != nil {
		return nil, fmt.Errorf("failed to create operation blocker: %w", err)
	}

	graphqlPreHandler := NewPreHandler(&PreHandlerOptions{
		Logger:                      s.logger,
		Executor:                    executor,
		Metrics:                     metrics,
		OperationProcessor:          operationProcessor,
		Planner:                     operationPlanner,
		AccessController:            s.accessController,
		OperationBlocker:            operationBlocker,
		RouterPublicKey:             s.publicKey,
		EnableRequestTracing:        s.engineExecutionConfiguration.EnableRequestTracing,
		DevelopmentMode:             s.developmentMode,
		TracerProvider:              s.tracerProvider,
		FlushTelemetryAfterResponse: s.awsLambda,
		TraceExportVariables:        s.traceConfig.ExportGraphQLVariables.Enabled,
		FileUploadEnabled:           s.fileUploadConfig.Enabled,
		MaxUploadFiles:              s.fileUploadConfig.MaxFiles,
		MaxUploadFileSize:           int(s.fileUploadConfig.MaxFileSizeBytes),
		ComplexityLimits:            s.securityConfiguration.ComplexityLimits,
		AlwaysIncludeQueryPlan:      s.engineExecutionConfiguration.Debug.AlwaysIncludeQueryPlan,
		AlwaysSkipLoader:            s.engineExecutionConfiguration.Debug.AlwaysSkipLoader,
		QueryPlansEnabled:           s.Config.queryPlansEnabled,
		QueryPlansLoggingEnabled:    s.engineExecutionConfiguration.Debug.PrintQueryPlans,
		TrackSchemaUsageInfo:        s.graphqlMetricsConfig.Enabled || s.metricConfig.Prometheus.PromSchemaFieldUsage.Enabled,
		ClientHeader:                s.clientHeader,
		ComputeOperationSha256:      computeSha256,
		ApolloCompatibilityFlags:    &s.apolloCompatibilityFlags,
		DisableVariablesRemapping:   s.engineExecutionConfiguration.DisableVariablesRemapping,
		ExprManager:                 exprManager,
		OmitBatchExtensions:         s.batchingConfig.OmitExtensions,
	})

	if s.webSocketConfiguration != nil && s.webSocketConfiguration.Enabled {
		wsMiddleware := NewWebsocketMiddleware(ctx, WebsocketMiddlewareOptions{
			OperationProcessor:        operationProcessor,
			OperationBlocker:          operationBlocker,
			Planner:                   operationPlanner,
			GraphQLHandler:            graphqlHandler,
			PreHandler:                graphqlPreHandler,
			Metrics:                   metrics,
			AccessController:          s.accessController,
			Logger:                    s.logger,
			Stats:                     s.engineStats,
			ReadTimeout:               s.engineExecutionConfiguration.WebSocketClientReadTimeout,
			WriteTimeout:              s.engineExecutionConfiguration.WebSocketClientWriteTimeout,
			EnableNetPoll:             s.engineExecutionConfiguration.EnableNetPoll,
			NetPollTimeout:            s.engineExecutionConfiguration.WebSocketClientPollTimeout,
			NetPollConnBufferSize:     s.engineExecutionConfiguration.WebSocketClientConnBufferSize,
			WebSocketConfiguration:    s.webSocketConfiguration,
			ClientHeader:              s.clientHeader,
			DisableVariablesRemapping: s.engineExecutionConfiguration.DisableVariablesRemapping,
			ApolloCompatibilityFlags:  s.apolloCompatibilityFlags,
		})

		// When the playground path is equal to the graphql path, we need to handle
		// ws upgrades and html requests on the same route.
		if s.playgroundConfig.Enabled && s.graphqlPath == s.playgroundConfig.Path {
			httpRouter.Use(s.playgroundHandler, wsMiddleware)
		} else {
			httpRouter.Use(wsMiddleware)
		}
	}

	httpRouter.Use(
		// Responsible for handling regular GraphQL requests over HTTP not WebSockets
		graphqlPreHandler.Handler,
		// Must be mounted after the websocket middleware to ensure that we only count non-hijacked requests like WebSockets
		func(handler http.Handler) http.Handler {
			return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
				requestContext := getRequestContext(r.Context())

				// We don't want to count any type of subscriptions e.g. SSE as in-flight requests because they are long-lived
				if requestContext != nil && requestContext.operation != nil && requestContext.operation.opType != OperationTypeSubscription {
					s.inFlightRequests.Add(1)

					// Counting like this is safe because according to the go http.ServeHTTP documentation
					// the requests is guaranteed to be finished when ServeHTTP returns
					defer s.inFlightRequests.Sub(1)
				}

				handler.ServeHTTP(w, r)
			})
		})

	// Mount built global and custom modules
	// Needs to be mounted after the pre-handler to ensure that the request was parsed and authorized
	httpRouter.Use(s.routerMiddlewares...)

	// GraphQL over POST
	httpRouter.Post(s.graphqlPath, graphqlHandler.ServeHTTP)
	// GraphQL over GET
	httpRouter.Get(s.graphqlPath, graphqlHandler.ServeHTTP)

	gm.mux = httpRouter

	s.graphMuxListLock.Lock()
	defer s.graphMuxListLock.Unlock()
	s.graphMuxList = append(s.graphMuxList, gm)

	return gm, nil
}