in router/core/graph_server.go [767:1456]
func (s *graphServer) buildGraphMux(
ctx context.Context,
opts BuildGraphMuxOptions,
) (*graphMux, error) {
gm := &graphMux{
metricStore: rmetric.NewNoopMetrics(),
}
httpRouter := chi.NewRouter()
// we only enable the attribute mapper if we are not using the default cloud exporter
baseMuxAttributes := append([]attribute.KeyValue{otel.WgRouterConfigVersion.String(opts.RouterConfigVersion)}, s.baseOtelAttributes...)
if !opts.IsBaseGraph() {
baseMuxAttributes = append(baseMuxAttributes, otel.WgFeatureFlag.String(opts.FeatureFlagName))
}
metricsEnabled := s.metricConfig.IsEnabled()
exprManager := expr.CreateNewExprManager()
// We might want to remap or exclude known attributes based on the configuration for metrics
mapper := newAttributeMapper(!rmetric.IsUsingDefaultCloudExporter(s.metricConfig), s.metricConfig.Attributes)
baseMetricAttributes := mapper.mapAttributes(baseMuxAttributes)
metricAttExpressions, attErr := newAttributeExpressions(s.metricConfig.Attributes, exprManager)
if attErr != nil {
return nil, attErr
}
var telemetryAttExpressions *attributeExpressions
if len(s.telemetryAttributes) > 0 {
var telemetryAttErr error
telemetryAttExpressions, telemetryAttErr = newAttributeExpressions(s.telemetryAttributes, exprManager)
if telemetryAttErr != nil {
return nil, telemetryAttErr
}
}
var tracingAttExpressions *attributeExpressions
if len(s.tracingAttributes) > 0 {
var tracingAttrErr error
tracingAttExpressions, tracingAttrErr = newAttributeExpressions(s.tracingAttributes, exprManager)
if tracingAttrErr != nil {
return nil, tracingAttrErr
}
}
// Prometheus metricStore rely on OTLP metricStore
if metricsEnabled {
attrKeyValues := []attribute.KeyValue{
otel.WgRouterConfigVersion.String(opts.RouterConfigVersion),
otel.WgRouterVersion.String(Version),
}
if !opts.IsBaseGraph() {
attrKeyValues = append(attrKeyValues, otel.WgFeatureFlag.String(opts.FeatureFlagName))
}
routerInfoBaseAttrs := otelmetric.WithAttributeSet(attribute.NewSet(attrKeyValues...))
// From a users perspective this is similar to engine metrics, etc
// but in this case we use the same metric store
otlpOpts := rmetric.MetricOpts{
EnableCircuitBreaker: s.metricConfig.OpenTelemetry.Enabled,
}
promOpts := rmetric.MetricOpts{
EnableCircuitBreaker: s.metricConfig.Prometheus.Enabled,
}
m, err := rmetric.NewStore(otlpOpts, promOpts,
rmetric.WithPromMeterProvider(s.promMeterProvider),
rmetric.WithOtlpMeterProvider(s.otlpMeterProvider),
rmetric.WithBaseAttributes(baseMetricAttributes),
rmetric.WithLogger(s.logger),
rmetric.WithProcessStartTime(s.processStartTime),
rmetric.WithCardinalityLimit(s.metricConfig.CardinalityLimit),
rmetric.WithRouterInfoAttributes(routerInfoBaseAttrs),
)
if err != nil {
return nil, fmt.Errorf("failed to create metric handler: %w", err)
}
gm.metricStore = m
}
// We initialize circuit breakers for all subgraphs in the base configuration (non-ff)
// so we don't duplicate circuit breakers for subgraphs and they can be used in the feature flags even
// We initialize it in the buildGraphMux because we want to use the base metric configuration
if opts.IsBaseGraph() && s.subgraphCircuitBreakerOptions.IsEnabled() {
// If either otel or prom metrics are enabled for circuit breakers
// we will enable circuit breaker metric collections
isCircuitBreakerMetricsEnabled := s.metricConfig.OpenTelemetry.CircuitBreaker || s.metricConfig.Prometheus.CircuitBreaker
err := s.circuitBreakerManager.Initialize(circuit.ManagerOpts{
SubgraphCircuitBreakers: s.subgraphCircuitBreakerOptions.SubgraphMap,
MetricStore: gm.metricStore,
UseMetrics: metricsEnabled && isCircuitBreakerMetricsEnabled,
BaseOtelAttributes: baseMetricAttributes,
AllGroupings: opts.RoutingUrlGroupings,
})
if err != nil {
return nil, err
}
}
subgraphs, err := configureSubgraphOverwrites(
opts.EngineConfig,
opts.ConfigSubgraphs,
s.overrideRoutingURLConfiguration,
s.overrides,
false,
)
if err != nil {
return nil, err
}
computeSha256, err := gm.buildOperationCaches(s)
if err != nil {
return nil, err
}
if err = gm.configureCacheMetrics(s, baseMetricAttributes); err != nil {
return nil, err
}
metrics := NewRouterMetrics(&routerMetricsConfig{
metrics: gm.metricStore,
gqlMetricsExporter: s.gqlMetricsExporter,
exportEnabled: s.graphqlMetricsConfig.Enabled,
routerConfigVersion: opts.RouterConfigVersion,
logger: s.logger,
promSchemaUsageEnabled: s.metricConfig.Prometheus.PromSchemaFieldUsage.Enabled,
promSchemaUsageIncludeOperationSha: s.metricConfig.Prometheus.PromSchemaFieldUsage.IncludeOperationSha,
})
baseLogFields := []zapcore.Field{
zap.String("config_version", opts.RouterConfigVersion),
}
if !opts.IsBaseGraph() {
baseLogFields = append(baseLogFields, zap.String("feature_flag", opts.FeatureFlagName))
}
// Currently, we only support custom attributes from the context for OTLP metrics
b := buildAttributesMap(s.metricConfig.Attributes)
// Enrich the request context with the subgraph information which is required for custom modules and tracing
subgraphResolver := NewSubgraphResolver(subgraphs)
httpRouter.Use(func(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
r = r.WithContext(withSubgraphResolver(r.Context(), subgraphResolver))
requestLogger := s.logger.With(logging.WithRequestID(middleware.GetReqID(r.Context())))
// If this is a batched request attach id to the logger
if batchedOperationId, ok := r.Context().Value(BatchedOperationId{}).(string); ok {
requestLogger = requestLogger.With(logging.WithBatchedRequestOperationID(batchedOperationId))
}
reqContext := buildRequestContext(requestContextOptions{
operationContext: nil,
requestLogger: requestLogger,
metricSetAttributes: b,
metricsEnabled: metricsEnabled,
traceEnabled: s.traceConfig.Enabled,
mapper: mapper,
metricAttributeExpressions: metricAttExpressions,
telemetryAttributeExpressions: telemetryAttExpressions,
tracingAttributeExpressions: tracingAttExpressions,
w: w,
r: r,
})
r = r.WithContext(withRequestContext(r.Context(), reqContext))
// For debugging purposes, we can validate from what version of the config the request is coming from
if s.setConfigVersionHeader {
w.Header().Set("X-Router-Config-Version", opts.RouterConfigVersion)
}
h.ServeHTTP(w, r)
})
})
var recoverOpts []recoveryhandler.Option
// If we have no access logger configured, we log the panic in the recovery handler to avoid losing the panic information
if s.accessLogsConfig == nil {
recoverOpts = append(recoverOpts, recoveryhandler.WithLogHandler(func(w http.ResponseWriter, r *http.Request, err any) {
reqContext := getRequestContext(r.Context())
if reqContext != nil {
reqContext.logger.Error("[Recovery from panic]",
zap.Any("error", err),
)
}
}))
}
recoveryHandler := recoveryhandler.New(recoverOpts...)
httpRouter.Use(recoveryHandler)
// Setup any router on request middlewares so that they can be used to manipulate
// other downstream internal middlewares such as tracing or authentication
httpRouter.Use(s.routerOnRequestHandlers...)
/**
* Initialize base attributes from headers and other sources
*/
var commonAttrRequestMapper func(r *http.Request) []attribute.KeyValue
if len(s.telemetryAttributes) > 0 {
// Common attributes across traces and metrics
commonAttrRequestMapper = buildHeaderAttributesMapper(s.telemetryAttributes)
}
var tracingAttrRequestMapper func(r *http.Request) []attribute.KeyValue
if len(s.tracingAttributes) > 0 {
tracingAttrRequestMapper = buildHeaderAttributesMapper(s.tracingAttributes)
}
var metricAttrRequestMapper func(r *http.Request) []attribute.KeyValue
if s.metricConfig.IsEnabled() {
// Metric attributes are only used for OTLP metrics and Prometheus metrics
metricAttrRequestMapper = buildHeaderAttributesMapper(s.metricConfig.Attributes)
}
httpRouter.Use(func(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
reqContext := getRequestContext(r.Context())
reqContext.telemetry.addCommonTraceAttribute(baseMuxAttributes...)
if commonAttrRequestMapper != nil {
reqContext.telemetry.addCommonAttribute(commonAttrRequestMapper(r)...)
}
if tracingAttrRequestMapper != nil {
reqContext.telemetry.addCommonTraceAttribute(tracingAttrRequestMapper(r)...)
}
if metricAttrRequestMapper != nil {
reqContext.telemetry.addMetricAttribute(metricAttrRequestMapper(r)...)
}
h.ServeHTTP(w, r)
})
})
if s.traceConfig.Enabled {
f := func(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
reqContext := getRequestContext(r.Context())
traceID := rtrace.GetTraceID(r.Context())
requestLogger := reqContext.Logger().With(logging.WithTraceID(traceID))
reqContext.logger = requestLogger
span := oteltrace.SpanFromContext(r.Context())
span.SetAttributes(reqContext.telemetry.traceAttrs...)
// Set if the trace is sampled in the expression context
isSampled := span.SpanContext().IsSampled()
reqContext.expressionContext.Request.Trace.Sampled = isSampled
// Set the trace ID in the response header
if s.traceConfig.ResponseTraceHeader.Enabled {
w.Header().Set(s.traceConfig.ResponseTraceHeader.HeaderName, traceID)
}
h.ServeHTTP(w, r)
})
}
httpRouter.Use(f)
}
var subgraphAccessLogger *requestlogger.SubgraphAccessLogger
if s.accessLogsConfig != nil && s.accessLogsConfig.Logger != nil {
exprAttributes, err := requestlogger.GetAccessLogConfigExpressions(s.accessLogsConfig.Attributes, exprManager)
if err != nil {
return nil, fmt.Errorf("failed building router access log expressions: %w", err)
}
accessLogAttributes := requestlogger.CleanupExpressionAttributes(s.accessLogsConfig.Attributes)
requestLoggerOpts := []requestlogger.Option{
requestlogger.WithDefaultOptions(),
requestlogger.WithNoTimeField(),
requestlogger.WithFields(baseLogFields...),
requestlogger.WithAttributes(accessLogAttributes),
requestlogger.WithExprAttributes(exprAttributes),
requestlogger.WithFieldsHandler(RouterAccessLogsFieldHandler),
}
var ipAnonConfig *requestlogger.IPAnonymizationConfig
if s.ipAnonymization.Enabled {
ipAnonConfig = &requestlogger.IPAnonymizationConfig{
Enabled: s.ipAnonymization.Enabled,
Method: requestlogger.IPAnonymizationMethod(s.ipAnonymization.Method),
}
requestLoggerOpts = append(requestLoggerOpts, requestlogger.WithAnonymization(ipAnonConfig))
}
requestLogger := requestlogger.New(
s.accessLogsConfig.Logger,
requestLoggerOpts...,
)
httpRouter.Use(requestLogger)
if s.accessLogsConfig.SubgraphEnabled {
subgraphExprAttributes, err := requestlogger.GetAccessLogConfigExpressions(s.accessLogsConfig.SubgraphAttributes, exprManager)
if err != nil {
return nil, fmt.Errorf("failed building router access log expressions: %w", err)
}
subgraphAttributes := requestlogger.CleanupExpressionAttributes(s.accessLogsConfig.SubgraphAttributes)
subgraphAccessLogger = requestlogger.NewSubgraphAccessLogger(
s.accessLogsConfig.Logger,
requestlogger.SubgraphOptions{
IPAnonymizationConfig: ipAnonConfig,
FieldsHandler: SubgraphAccessLogsFieldHandler,
Fields: baseLogFields,
Attributes: subgraphAttributes,
ExprAttributes: subgraphExprAttributes,
})
}
if exprManager.VisitorManager.IsResponseBodyUsedInExpressions() {
httpRouter.Use(func(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
buf := bytes.Buffer{}
ww := middleware.NewWrapResponseWriter(w, r.ProtoMajor)
ww.Tee(&buf)
h.ServeHTTP(ww, r)
reqContext := getRequestContext(r.Context())
reqContext.expressionContext.Response.Body.Raw = buf.String()
})
})
}
}
routerEngineConfig := &RouterEngineConfiguration{
Execution: s.engineExecutionConfiguration,
Headers: s.headerRules,
Events: s.eventsConfig,
SubgraphErrorPropagation: s.subgraphErrorPropagation,
}
// map[string]*http.Transport cannot be coerced into map[string]http.RoundTripper, unfortunately
subgraphTippers := map[string]http.RoundTripper{}
for subgraph, subgraphTransport := range s.subgraphTransports {
subgraphTippers[subgraph] = subgraphTransport
}
if err := s.setupConnector(ctx, opts.EngineConfig, opts.ConfigSubgraphs); err != nil {
return nil, fmt.Errorf("failed to setup plugin host: %w", err)
}
enableTraceClient := s.connectionMetrics != nil || exprManager.VisitorManager.IsSubgraphTraceUsedInExpressions()
var baseConnMetricStore rmetric.ConnectionMetricStore = &rmetric.NoopConnectionMetricStore{}
if s.connectionMetrics != nil {
baseConnMetricStore = s.connectionMetrics
}
ecb := &ExecutorConfigurationBuilder{
introspection: s.introspection,
baseURL: s.baseURL,
baseTripper: s.baseTransport,
subgraphTrippers: subgraphTippers,
pluginHost: s.connector,
logger: s.logger,
trackUsageInfo: s.graphqlMetricsConfig.Enabled || s.metricConfig.Prometheus.PromSchemaFieldUsage.Enabled,
subscriptionClientOptions: &SubscriptionClientOptions{
PingInterval: s.engineExecutionConfiguration.WebSocketClientPingInterval,
PingTimeout: s.engineExecutionConfiguration.WebSocketClientPingTimeout,
ReadTimeout: s.engineExecutionConfiguration.WebSocketClientReadTimeout,
FrameTimeout: s.engineExecutionConfiguration.WebSocketClientFrameTimeout,
},
transportOptions: &TransportOptions{
SubgraphTransportOptions: s.subgraphTransportOptions,
PreHandlers: s.preOriginHandlers,
PostHandlers: s.postOriginHandlers,
MetricStore: gm.metricStore,
ConnectionMetricStore: baseConnMetricStore,
RetryOptions: retrytransport.RetryOptions{
Enabled: s.retryOptions.Enabled,
MaxRetryCount: s.retryOptions.MaxRetryCount,
MaxDuration: s.retryOptions.MaxDuration,
Interval: s.retryOptions.Interval,
ShouldRetry: func(err error, req *http.Request, resp *http.Response) bool {
return retrytransport.IsRetryableError(err, resp) && !isMutationRequest(req.Context())
},
},
TracerProvider: s.tracerProvider,
TracePropagators: s.compositePropagator,
LocalhostFallbackInsideDocker: s.localhostFallbackInsideDocker,
Logger: s.logger,
EnableTraceClient: enableTraceClient,
CircuitBreaker: s.circuitBreakerManager,
},
}
executor, providers, err := ecb.Build(
ctx,
&ExecutorBuildOptions{
EngineConfig: opts.EngineConfig,
Subgraphs: opts.ConfigSubgraphs,
RouterEngineConfig: routerEngineConfig,
Reporter: s.engineStats,
ApolloCompatibilityFlags: s.apolloCompatibilityFlags,
ApolloRouterCompatibilityFlags: s.apolloRouterCompatibilityFlags,
HeartbeatInterval: s.multipartHeartbeatInterval,
PluginsEnabled: s.plugins.Enabled,
InstanceData: s.instanceData,
},
)
if err != nil {
return nil, fmt.Errorf("failed to build plan configuration: %w", err)
}
s.pubSubProviders = providers
if pubSubStartupErr := s.startupPubSubProviders(ctx); pubSubStartupErr != nil {
return nil, pubSubStartupErr
}
operationProcessor := NewOperationProcessor(OperationProcessorOptions{
Executor: executor,
MaxOperationSizeInBytes: int64(s.routerTrafficConfig.MaxRequestBodyBytes),
PersistedOperationClient: s.persistedOperationClient,
AutomaticPersistedOperationCacheTtl: s.automaticPersistedQueriesConfig.Cache.TTL,
EnablePersistedOperationsCache: s.engineExecutionConfiguration.EnablePersistedOperationsCache,
PersistedOpsNormalizationCache: gm.persistedOperationCache,
NormalizationCache: gm.normalizationCache,
ValidationCache: gm.validationCache,
QueryDepthCache: gm.complexityCalculationCache,
OperationHashCache: gm.operationHashCache,
ParseKitPoolSize: s.engineExecutionConfiguration.ParseKitPoolSize,
IntrospectionEnabled: s.Config.introspection,
ParserTokenizerLimits: astparser.TokenizerLimits{
MaxDepth: s.Config.securityConfiguration.ParserLimits.ApproximateDepthLimit,
MaxFields: s.Config.securityConfiguration.ParserLimits.TotalFieldsLimit,
},
ApolloCompatibilityFlags: s.apolloCompatibilityFlags,
ApolloRouterCompatibilityFlags: s.apolloRouterCompatibilityFlags,
DisableExposingVariablesContentOnValidationError: s.engineExecutionConfiguration.DisableExposingVariablesContentOnValidationError,
ComplexityLimits: s.securityConfiguration.ComplexityLimits,
})
operationPlanner := NewOperationPlanner(executor, gm.planCache)
// We support the MCP only on the base graph. Feature flags are not supported yet.
if opts.IsBaseGraph() && s.mcpServer != nil {
if mErr := s.mcpServer.Reload(executor.ClientSchema); mErr != nil {
return nil, fmt.Errorf("failed to reload MCP server: %w", mErr)
}
}
if s.Config.cacheWarmup != nil && s.Config.cacheWarmup.Enabled {
if s.graphApiToken == "" {
return nil, fmt.Errorf("graph token is required for cache warmup in order to communicate with the CDN")
}
processor := NewCacheWarmupPlanningProcessor(&CacheWarmupPlanningProcessorOptions{
OperationProcessor: operationProcessor,
OperationPlanner: operationPlanner,
ComplexityLimits: s.securityConfiguration.ComplexityLimits,
RouterSchema: executor.RouterSchema,
TrackSchemaUsage: s.graphqlMetricsConfig.Enabled,
DisableVariablesRemapping: s.engineExecutionConfiguration.DisableVariablesRemapping,
})
warmupConfig := &CacheWarmupConfig{
Log: s.logger,
Processor: processor,
Workers: s.Config.cacheWarmup.Workers,
ItemsPerSecond: s.Config.cacheWarmup.ItemsPerSecond,
Timeout: s.Config.cacheWarmup.Timeout,
}
warmupConfig.AfterOperation = func(item *CacheWarmupOperationPlanResult) {
gm.metricStore.MeasureOperationPlanningTime(ctx,
item.PlanningTime,
nil,
otelmetric.WithAttributes(
append([]attribute.KeyValue{
otel.WgOperationName.String(item.OperationName),
otel.WgClientName.String(item.ClientName),
otel.WgClientVersion.String(item.ClientVersion),
otel.WgFeatureFlag.String(opts.FeatureFlagName),
otel.WgOperationHash.String(item.OperationHash),
otel.WgOperationType.String(item.OperationType),
otel.WgEnginePlanCacheHit.Bool(false),
}, baseMetricAttributes...)...,
),
)
}
if s.Config.cacheWarmup.Source.Filesystem != nil {
warmupConfig.Source = NewFileSystemSource(&FileSystemSourceConfig{
RootPath: s.Config.cacheWarmup.Source.Filesystem.Path,
})
} else {
cdnSource, err := NewCDNSource(s.Config.cdnConfig.URL, s.graphApiToken, s.logger)
if err != nil {
return nil, fmt.Errorf("failed to create cdn source: %w", err)
}
warmupConfig.Source = cdnSource
}
err = WarmupCaches(ctx, warmupConfig)
if err != nil {
// We don't want to fail the server if the cache warmup fails
s.logger.Error("Failed to warmup caches. It will retry after server restart or graph execution config update", zap.Error(err))
}
}
authorizerOptions := &CosmoAuthorizerOptions{
FieldConfigurations: opts.EngineConfig.FieldConfigurations,
RejectOperationIfUnauthorized: false,
}
if s.Config.authorization != nil {
authorizerOptions.RejectOperationIfUnauthorized = s.authorization.RejectOperationIfUnauthorized
}
handlerOpts := HandlerOptions{
Executor: executor,
Log: s.logger,
EnableExecutionPlanCacheResponseHeader: s.engineExecutionConfiguration.EnableExecutionPlanCacheResponseHeader,
EnablePersistedOperationCacheResponseHeader: s.engineExecutionConfiguration.Debug.EnablePersistedOperationsCacheResponseHeader,
EnableNormalizationCacheResponseHeader: s.engineExecutionConfiguration.Debug.EnableNormalizationCacheResponseHeader,
EnableResponseHeaderPropagation: s.headerRules != nil,
EngineStats: s.engineStats,
TracerProvider: s.tracerProvider,
Authorizer: NewCosmoAuthorizer(authorizerOptions),
SubgraphErrorPropagation: s.subgraphErrorPropagation,
EngineLoaderHooks: NewEngineRequestHooks(
gm.metricStore,
subgraphAccessLogger,
s.tracerProvider,
tracingAttExpressions,
telemetryAttExpressions,
metricAttExpressions,
exprManager.VisitorManager.IsSubgraphResponseBodyUsedInExpressions(),
),
}
if s.redisClient != nil {
handlerOpts.RateLimitConfig = s.rateLimit
handlerOpts.RateLimiter, err = NewCosmoRateLimiter(&CosmoRateLimiterOptions{
RedisClient: s.redisClient,
Debug: s.rateLimit.Debug,
RejectStatusCode: s.rateLimit.SimpleStrategy.RejectStatusCode,
KeySuffixExpression: s.rateLimit.KeySuffixExpression,
ExprManager: exprManager,
})
if err != nil {
return nil, fmt.Errorf("failed to create rate limiter: %w", err)
}
}
if s.apolloCompatibilityFlags.SubscriptionMultipartPrintBoundary.Enabled {
handlerOpts.ApolloSubscriptionMultipartPrintBoundary = s.apolloCompatibilityFlags.SubscriptionMultipartPrintBoundary.Enabled
}
graphqlHandler := NewGraphQLHandler(handlerOpts)
executor.Resolver.SetAsyncErrorWriter(graphqlHandler)
operationBlocker, err := NewOperationBlocker(&OperationBlockerOptions{
BlockMutations: BlockMutationOptions{
Enabled: s.securityConfiguration.BlockMutations.Enabled,
Condition: s.securityConfiguration.BlockMutations.Condition,
},
BlockSubscriptions: BlockSubscriptionOptions{
Enabled: s.securityConfiguration.BlockSubscriptions.Enabled,
Condition: s.securityConfiguration.BlockSubscriptions.Condition,
},
BlockNonPersisted: BlockNonPersistedOptions{
Enabled: s.securityConfiguration.BlockNonPersistedOperations.Enabled,
Condition: s.securityConfiguration.BlockNonPersistedOperations.Condition,
},
PersistedOperationsDisabled: s.persistedOperationsConfig.Disabled,
SafelistEnabled: s.persistedOperationsConfig.Safelist.Enabled,
LogUnknownOperationsEnabled: s.persistedOperationsConfig.LogUnknown,
exprManager: exprManager,
})
if err != nil {
return nil, fmt.Errorf("failed to create operation blocker: %w", err)
}
graphqlPreHandler := NewPreHandler(&PreHandlerOptions{
Logger: s.logger,
Executor: executor,
Metrics: metrics,
OperationProcessor: operationProcessor,
Planner: operationPlanner,
AccessController: s.accessController,
OperationBlocker: operationBlocker,
RouterPublicKey: s.publicKey,
EnableRequestTracing: s.engineExecutionConfiguration.EnableRequestTracing,
DevelopmentMode: s.developmentMode,
TracerProvider: s.tracerProvider,
FlushTelemetryAfterResponse: s.awsLambda,
TraceExportVariables: s.traceConfig.ExportGraphQLVariables.Enabled,
FileUploadEnabled: s.fileUploadConfig.Enabled,
MaxUploadFiles: s.fileUploadConfig.MaxFiles,
MaxUploadFileSize: int(s.fileUploadConfig.MaxFileSizeBytes),
ComplexityLimits: s.securityConfiguration.ComplexityLimits,
AlwaysIncludeQueryPlan: s.engineExecutionConfiguration.Debug.AlwaysIncludeQueryPlan,
AlwaysSkipLoader: s.engineExecutionConfiguration.Debug.AlwaysSkipLoader,
QueryPlansEnabled: s.Config.queryPlansEnabled,
QueryPlansLoggingEnabled: s.engineExecutionConfiguration.Debug.PrintQueryPlans,
TrackSchemaUsageInfo: s.graphqlMetricsConfig.Enabled || s.metricConfig.Prometheus.PromSchemaFieldUsage.Enabled,
ClientHeader: s.clientHeader,
ComputeOperationSha256: computeSha256,
ApolloCompatibilityFlags: &s.apolloCompatibilityFlags,
DisableVariablesRemapping: s.engineExecutionConfiguration.DisableVariablesRemapping,
ExprManager: exprManager,
OmitBatchExtensions: s.batchingConfig.OmitExtensions,
})
if s.webSocketConfiguration != nil && s.webSocketConfiguration.Enabled {
wsMiddleware := NewWebsocketMiddleware(ctx, WebsocketMiddlewareOptions{
OperationProcessor: operationProcessor,
OperationBlocker: operationBlocker,
Planner: operationPlanner,
GraphQLHandler: graphqlHandler,
PreHandler: graphqlPreHandler,
Metrics: metrics,
AccessController: s.accessController,
Logger: s.logger,
Stats: s.engineStats,
ReadTimeout: s.engineExecutionConfiguration.WebSocketClientReadTimeout,
WriteTimeout: s.engineExecutionConfiguration.WebSocketClientWriteTimeout,
EnableNetPoll: s.engineExecutionConfiguration.EnableNetPoll,
NetPollTimeout: s.engineExecutionConfiguration.WebSocketClientPollTimeout,
NetPollConnBufferSize: s.engineExecutionConfiguration.WebSocketClientConnBufferSize,
WebSocketConfiguration: s.webSocketConfiguration,
ClientHeader: s.clientHeader,
DisableVariablesRemapping: s.engineExecutionConfiguration.DisableVariablesRemapping,
ApolloCompatibilityFlags: s.apolloCompatibilityFlags,
})
// When the playground path is equal to the graphql path, we need to handle
// ws upgrades and html requests on the same route.
if s.playgroundConfig.Enabled && s.graphqlPath == s.playgroundConfig.Path {
httpRouter.Use(s.playgroundHandler, wsMiddleware)
} else {
httpRouter.Use(wsMiddleware)
}
}
httpRouter.Use(
// Responsible for handling regular GraphQL requests over HTTP not WebSockets
graphqlPreHandler.Handler,
// Must be mounted after the websocket middleware to ensure that we only count non-hijacked requests like WebSockets
func(handler http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
requestContext := getRequestContext(r.Context())
// We don't want to count any type of subscriptions e.g. SSE as in-flight requests because they are long-lived
if requestContext != nil && requestContext.operation != nil && requestContext.operation.opType != OperationTypeSubscription {
s.inFlightRequests.Add(1)
// Counting like this is safe because according to the go http.ServeHTTP documentation
// the requests is guaranteed to be finished when ServeHTTP returns
defer s.inFlightRequests.Sub(1)
}
handler.ServeHTTP(w, r)
})
})
// Mount built global and custom modules
// Needs to be mounted after the pre-handler to ensure that the request was parsed and authorized
httpRouter.Use(s.routerMiddlewares...)
// GraphQL over POST
httpRouter.Post(s.graphqlPath, graphqlHandler.ServeHTTP)
// GraphQL over GET
httpRouter.Get(s.graphqlPath, graphqlHandler.ServeHTTP)
gm.mux = httpRouter
s.graphMuxListLock.Lock()
defer s.graphMuxListLock.Unlock()
s.graphMuxList = append(s.graphMuxList, gm)
return gm, nil
}