in router/core/graph_server.go [599:1175]
func (s *graphServer) buildGraphMux(ctx context.Context,
featureFlagName string,
routerConfigVersion string,
engineConfig *nodev1.EngineConfiguration,
configSubgraphs []*nodev1.Subgraph,
) (*graphMux, error) {
gm := &graphMux{
metricStore: rmetric.NewNoopMetrics(),
}
httpRouter := chi.NewRouter()
baseOtelAttributes := append([]attribute.KeyValue{otel.WgRouterConfigVersion.String(routerConfigVersion)}, s.baseOtelAttributes...)
if featureFlagName != "" {
baseOtelAttributes = append(baseOtelAttributes, otel.WgFeatureFlag.String(featureFlagName))
}
metricsEnabled := s.metricConfig.IsEnabled()
// we only enable the attribute mapper if we are not using the default cloud exporter
enableAttributeMapper := !(s.metricConfig.IsUsingCloudExporter || rmetric.IsDefaultCloudExporterConfigured(s.metricConfig.OpenTelemetry.Exporters))
// We might want to remap or exclude known attributes based on the configuration for metrics
mapper := newAttributeMapper(enableAttributeMapper, s.metricConfig.Attributes)
attExpressions, attErr := newAttributeExpressions(s.metricConfig.Attributes)
if attErr != nil {
return nil, attErr
}
baseMetricAttributes := mapper.mapAttributes(baseOtelAttributes)
var telemetryAttExpressions *attributeExpressions
if len(s.telemetryAttributes) > 0 {
var telemetryAttErr error
telemetryAttExpressions, telemetryAttErr = newAttributeExpressions(s.telemetryAttributes)
if telemetryAttErr != nil {
return nil, telemetryAttErr
}
}
// Prometheus metricStore rely on OTLP metricStore
if metricsEnabled {
m, err := rmetric.NewStore(
rmetric.WithPromMeterProvider(s.promMeterProvider),
rmetric.WithOtlpMeterProvider(s.otlpMeterProvider),
rmetric.WithBaseAttributes(baseMetricAttributes),
rmetric.WithLogger(s.logger),
rmetric.WithProcessStartTime(s.processStartTime),
rmetric.WithCardinalityLimit(rmetric.DefaultCardinalityLimit),
)
if err != nil {
return nil, fmt.Errorf("failed to create metric handler: %w", err)
}
gm.metricStore = m
}
subgraphs, err := configureSubgraphOverwrites(
engineConfig,
configSubgraphs,
s.overrideRoutingURLConfiguration,
s.overrides,
)
if err != nil {
return nil, err
}
computeSha256, err := gm.buildOperationCaches(s)
if err != nil {
return nil, err
}
if err = gm.configureCacheMetrics(s, baseMetricAttributes); err != nil {
return nil, err
}
metrics := NewRouterMetrics(&routerMetricsConfig{
metrics: gm.metricStore,
gqlMetricsExporter: s.gqlMetricsExporter,
exportEnabled: s.graphqlMetricsConfig.Enabled,
routerConfigVersion: routerConfigVersion,
logger: s.logger,
})
baseLogFields := []zapcore.Field{
zap.String("config_version", routerConfigVersion),
}
if featureFlagName != "" {
baseLogFields = append(baseLogFields, zap.String("feature_flag", featureFlagName))
}
// Currently, we only support custom attributes from the context for OTLP metrics
b := buildAttributesMap(s.metricConfig.Attributes)
// Enrich the request context with the subgraph information which is required for custom modules and tracing
subgraphResolver := NewSubgraphResolver(subgraphs)
httpRouter.Use(func(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
requestLogger := s.logger.With(logging.WithRequestID(middleware.GetReqID(r.Context())))
r = r.WithContext(withSubgraphResolver(r.Context(), subgraphResolver))
reqContext := buildRequestContext(requestContextOptions{
operationContext: nil,
requestLogger: requestLogger,
metricSetAttributes: b,
metricsEnabled: metricsEnabled,
traceEnabled: s.traceConfig.Enabled,
mapper: mapper,
metricAttributeExpressions: attExpressions,
telemetryAttributeExpressions: telemetryAttExpressions,
w: w,
r: r,
})
r = r.WithContext(withRequestContext(r.Context(), reqContext))
// For debugging purposes, we can validate from what version of the config the request is coming from
if s.setConfigVersionHeader {
w.Header().Set("X-Router-Config-Version", routerConfigVersion)
}
h.ServeHTTP(w, r)
})
})
var recoverOpts []recoveryhandler.Option
// If we have no access logger configured, we log the panic in the recovery handler to avoid losing the panic information
if s.accessLogsConfig == nil {
recoverOpts = append(recoverOpts, recoveryhandler.WithLogHandler(func(w http.ResponseWriter, r *http.Request, err any) {
reqContext := getRequestContext(r.Context())
if reqContext != nil {
reqContext.logger.Error("[Recovery from panic]",
zap.Any("error", err),
)
}
}))
}
recoveryHandler := recoveryhandler.New(recoverOpts...)
httpRouter.Use(recoveryHandler)
// Setup any router on request middlewares so that they can be used to manipulate
// other downstream internal middlewares such as tracing or authentication
httpRouter.Use(s.routerOnRequestHandlers...)
/**
* Initialize base attributes from headers and other sources
*/
var commonAttrRequestMapper func(r *http.Request) []attribute.KeyValue
if len(s.telemetryAttributes) > 0 {
// Common attributes across traces and metrics
commonAttrRequestMapper = buildHeaderAttributesMapper(s.telemetryAttributes)
}
var metricAttrRequestMapper func(r *http.Request) []attribute.KeyValue
// Metric attributes are only used for OTLP metrics and Prometheus metrics
if s.metricConfig.IsEnabled() {
metricAttrRequestMapper = buildHeaderAttributesMapper(s.metricConfig.Attributes)
}
httpRouter.Use(func(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
reqContext := getRequestContext(r.Context())
reqContext.telemetry.addCommonTraceAttribute(baseOtelAttributes...)
reqContext.telemetry.addCommonTraceAttribute(otel.WgRouterConfigVersion.String(routerConfigVersion))
if commonAttrRequestMapper != nil {
reqContext.telemetry.addCommonAttribute(commonAttrRequestMapper(r)...)
}
if metricAttrRequestMapper != nil {
reqContext.telemetry.addMetricAttribute(metricAttrRequestMapper(r)...)
}
h.ServeHTTP(w, r)
})
})
if s.traceConfig.Enabled {
spanStartOptions := []oteltrace.SpanStartOption{
oteltrace.WithAttributes(
otel.RouterServerAttribute,
otel.WgRouterRootSpan.Bool(true),
),
}
if s.traceConfig.WithNewRoot {
spanStartOptions = append(spanStartOptions, oteltrace.WithNewRoot())
}
middlewareOptions := []otelhttp.Option{
otelhttp.WithSpanOptions(spanStartOptions...),
otelhttp.WithFilter(rtrace.CommonRequestFilter),
otelhttp.WithFilter(rtrace.PrefixRequestFilter(
[]string{s.healthCheckPath, s.readinessCheckPath, s.livenessCheckPath}),
),
// Disable built-in metricStore through NoopMeterProvider
otelhttp.WithMeterProvider(sdkmetric.NewMeterProvider()),
otelhttp.WithSpanNameFormatter(SpanNameFormatter),
otelhttp.WithTracerProvider(s.tracerProvider),
}
if s.compositePropagator != nil {
middlewareOptions = append(middlewareOptions, otelhttp.WithPropagators(s.compositePropagator))
}
traceHandler := rtrace.NewMiddleware(
rtrace.WithTracePreHandler(
func(r *http.Request, w http.ResponseWriter) {
reqContext := getRequestContext(r.Context())
traceID := rtrace.GetTraceID(r.Context())
requestLogger := reqContext.Logger().With(logging.WithTraceID(traceID))
reqContext.logger = requestLogger
span := oteltrace.SpanFromContext(r.Context())
span.SetAttributes(reqContext.telemetry.traceAttrs...)
// Set the trace ID in the response header
if s.traceConfig.ResponseTraceHeader.Enabled {
w.Header().Set(s.traceConfig.ResponseTraceHeader.HeaderName, traceID)
}
}),
rtrace.WithOtelHttp(middlewareOptions...),
)
httpRouter.Use(traceHandler.Handler)
}
var subgraphAccessLogger *requestlogger.SubgraphAccessLogger
if s.accessLogsConfig != nil && s.accessLogsConfig.Logger != nil {
exprAttributes, err := requestlogger.GetAccessLogConfigExpressions(s.accessLogsConfig.Attributes)
if err != nil {
return nil, fmt.Errorf("failed building router access log expressions: %w", err)
}
s.accessLogsConfig.Attributes = requestlogger.CleanupExpressionAttributes(s.accessLogsConfig.Attributes)
requestLoggerOpts := []requestlogger.Option{
requestlogger.WithDefaultOptions(),
requestlogger.WithNoTimeField(),
requestlogger.WithFields(baseLogFields...),
requestlogger.WithAttributes(s.accessLogsConfig.Attributes),
requestlogger.WithExprAttributes(exprAttributes),
requestlogger.WithFieldsHandler(RouterAccessLogsFieldHandler),
}
var ipAnonConfig *requestlogger.IPAnonymizationConfig
if s.ipAnonymization.Enabled {
ipAnonConfig = &requestlogger.IPAnonymizationConfig{
Enabled: s.ipAnonymization.Enabled,
Method: requestlogger.IPAnonymizationMethod(s.ipAnonymization.Method),
}
requestLoggerOpts = append(requestLoggerOpts, requestlogger.WithAnonymization(ipAnonConfig))
}
requestLogger := requestlogger.New(
s.accessLogsConfig.Logger,
requestLoggerOpts...,
)
httpRouter.Use(requestLogger)
if s.accessLogsConfig.SubgraphEnabled {
s.accessLogsConfig.SubgraphAttributes = requestlogger.CleanupExpressionAttributes(s.accessLogsConfig.SubgraphAttributes)
subgraphAccessLogger = requestlogger.NewSubgraphAccessLogger(
s.accessLogsConfig.Logger,
requestlogger.SubgraphOptions{
IPAnonymizationConfig: ipAnonConfig,
FieldsHandler: SubgraphAccessLogsFieldHandler,
Fields: baseLogFields,
Attributes: s.accessLogsConfig.SubgraphAttributes,
})
}
}
routerEngineConfig := &RouterEngineConfiguration{
Execution: s.engineExecutionConfiguration,
Headers: s.headerRules,
Events: s.eventsConfig,
SubgraphErrorPropagation: s.subgraphErrorPropagation,
}
err = s.buildPubSubConfiguration(ctx, engineConfig, routerEngineConfig)
if err != nil {
return nil, fmt.Errorf("failed to build pubsub configuration: %w", err)
}
ecb := &ExecutorConfigurationBuilder{
introspection: s.introspection,
baseURL: s.baseURL,
transport: s.executionTransport,
logger: s.logger,
trackUsageInfo: s.graphqlMetricsConfig.Enabled,
transportOptions: &TransportOptions{
Proxy: s.executionTransportProxy,
SubgraphTransportOptions: s.subgraphTransportOptions,
PreHandlers: s.preOriginHandlers,
PostHandlers: s.postOriginHandlers,
MetricStore: gm.metricStore,
RetryOptions: retrytransport.RetryOptions{
Enabled: s.retryOptions.Enabled,
MaxRetryCount: s.retryOptions.MaxRetryCount,
MaxDuration: s.retryOptions.MaxDuration,
Interval: s.retryOptions.Interval,
ShouldRetry: func(err error, req *http.Request, resp *http.Response) bool {
return retrytransport.IsRetryableError(err, resp) && !isMutationRequest(req.Context())
},
},
TracerProvider: s.tracerProvider,
TracePropagators: s.compositePropagator,
LocalhostFallbackInsideDocker: s.localhostFallbackInsideDocker,
Logger: s.logger,
},
}
executor, err := ecb.Build(
ctx,
&ExecutorBuildOptions{
EngineConfig: engineConfig,
Subgraphs: configSubgraphs,
RouterEngineConfig: routerEngineConfig,
PubSubProviders: s.pubSubProviders,
Reporter: s.engineStats,
ApolloCompatibilityFlags: s.apolloCompatibilityFlags,
ApolloRouterCompatibilityFlags: s.apolloRouterCompatibilityFlags,
HeartbeatInterval: s.multipartHeartbeatInterval,
},
)
if err != nil {
return nil, fmt.Errorf("failed to build plan configuration: %w", err)
}
operationProcessor := NewOperationProcessor(OperationProcessorOptions{
Executor: executor,
MaxOperationSizeInBytes: int64(s.routerTrafficConfig.MaxRequestBodyBytes),
PersistedOperationClient: s.persistedOperationClient,
AutomaticPersistedOperationCacheTtl: s.automaticPersistedQueriesConfig.Cache.TTL,
EnablePersistedOperationsCache: s.engineExecutionConfiguration.EnablePersistedOperationsCache,
PersistedOpsNormalizationCache: gm.persistedOperationCache,
NormalizationCache: gm.normalizationCache,
ValidationCache: gm.validationCache,
QueryDepthCache: gm.complexityCalculationCache,
OperationHashCache: gm.operationHashCache,
ParseKitPoolSize: s.engineExecutionConfiguration.ParseKitPoolSize,
IntrospectionEnabled: s.Config.introspection,
ApolloCompatibilityFlags: s.apolloCompatibilityFlags,
ApolloRouterCompatibilityFlags: s.apolloRouterCompatibilityFlags,
})
operationPlanner := NewOperationPlanner(executor, gm.planCache)
if s.Config.cacheWarmup != nil && s.Config.cacheWarmup.Enabled {
if s.graphApiToken == "" {
return nil, fmt.Errorf("graph token is required for cache warmup in order to communicate with the CDN")
}
processor := NewCacheWarmupPlanningProcessor(&CacheWarmupPlanningProcessorOptions{
OperationProcessor: operationProcessor,
OperationPlanner: operationPlanner,
ComplexityLimits: s.securityConfiguration.ComplexityLimits,
RouterSchema: executor.RouterSchema,
TrackSchemaUsage: s.graphqlMetricsConfig.Enabled,
DisableVariablesRemapping: s.engineExecutionConfiguration.DisableVariablesRemapping,
})
warmupConfig := &CacheWarmupConfig{
Log: s.logger,
Processor: processor,
Workers: s.Config.cacheWarmup.Workers,
ItemsPerSecond: s.Config.cacheWarmup.ItemsPerSecond,
Timeout: s.Config.cacheWarmup.Timeout,
}
warmupConfig.AfterOperation = func(item *CacheWarmupOperationPlanResult) {
gm.metricStore.MeasureOperationPlanningTime(ctx,
item.PlanningTime,
nil,
otelmetric.WithAttributes(
append([]attribute.KeyValue{
otel.WgOperationName.String(item.OperationName),
otel.WgClientName.String(item.ClientName),
otel.WgClientVersion.String(item.ClientVersion),
otel.WgFeatureFlag.String(featureFlagName),
otel.WgOperationHash.String(item.OperationHash),
otel.WgOperationType.String(item.OperationType),
otel.WgEnginePlanCacheHit.Bool(false),
}, baseMetricAttributes...)...,
),
)
}
if s.Config.cacheWarmup.Source.Filesystem != nil {
warmupConfig.Source = NewFileSystemSource(&FileSystemSourceConfig{
RootPath: s.Config.cacheWarmup.Source.Filesystem.Path,
})
} else {
cdnSource, err := NewCDNSource(s.Config.cdnConfig.URL, s.graphApiToken, s.logger)
if err != nil {
return nil, fmt.Errorf("failed to create cdn source: %w", err)
}
warmupConfig.Source = cdnSource
}
err = WarmupCaches(ctx, warmupConfig)
if err != nil {
// We don't want to fail the server if the cache warmup fails
s.logger.Error("Failed to warmup caches. It will retry after server restart or graph execution config update", zap.Error(err))
}
}
authorizerOptions := &CosmoAuthorizerOptions{
FieldConfigurations: engineConfig.FieldConfigurations,
RejectOperationIfUnauthorized: false,
}
if s.Config.authorization != nil {
authorizerOptions.RejectOperationIfUnauthorized = s.authorization.RejectOperationIfUnauthorized
}
handlerOpts := HandlerOptions{
Executor: executor,
Log: s.logger,
EnableExecutionPlanCacheResponseHeader: s.engineExecutionConfiguration.EnableExecutionPlanCacheResponseHeader,
EnablePersistedOperationCacheResponseHeader: s.engineExecutionConfiguration.Debug.EnablePersistedOperationsCacheResponseHeader,
EnableNormalizationCacheResponseHeader: s.engineExecutionConfiguration.Debug.EnableNormalizationCacheResponseHeader,
EnableResponseHeaderPropagation: s.headerRules != nil,
EngineStats: s.engineStats,
TracerProvider: s.tracerProvider,
Authorizer: NewCosmoAuthorizer(authorizerOptions),
SubgraphErrorPropagation: s.subgraphErrorPropagation,
EngineLoaderHooks: NewEngineRequestHooks(gm.metricStore, subgraphAccessLogger, s.tracerProvider),
}
if s.redisClient != nil {
handlerOpts.RateLimitConfig = s.rateLimit
handlerOpts.RateLimiter, err = NewCosmoRateLimiter(&CosmoRateLimiterOptions{
RedisClient: s.redisClient,
Debug: s.rateLimit.Debug,
RejectStatusCode: s.rateLimit.SimpleStrategy.RejectStatusCode,
KeySuffixExpression: s.rateLimit.KeySuffixExpression,
})
if err != nil {
return nil, fmt.Errorf("failed to create rate limiter: %w", err)
}
}
if s.apolloCompatibilityFlags.SubscriptionMultipartPrintBoundary.Enabled {
handlerOpts.ApolloSubscriptionMultipartPrintBoundary = s.apolloCompatibilityFlags.SubscriptionMultipartPrintBoundary.Enabled
}
graphqlHandler := NewGraphQLHandler(handlerOpts)
executor.Resolver.SetAsyncErrorWriter(graphqlHandler)
operationBlocker, err := NewOperationBlocker(&OperationBlockerOptions{
BlockMutations: BlockMutationOptions{
Enabled: s.securityConfiguration.BlockMutations.Enabled,
Condition: s.securityConfiguration.BlockMutations.Condition,
},
BlockSubscriptions: BlockSubscriptionOptions{
Enabled: s.securityConfiguration.BlockSubscriptions.Enabled,
Condition: s.securityConfiguration.BlockSubscriptions.Condition,
},
BlockNonPersisted: BlockNonPersistedOptions{
Enabled: s.securityConfiguration.BlockNonPersistedOperations.Enabled,
Condition: s.securityConfiguration.BlockNonPersistedOperations.Condition,
},
SafelistEnabled: s.persistedOperationsConfig.Safelist.Enabled,
LogUnknownOperationsEnabled: s.persistedOperationsConfig.LogUnknown,
})
if err != nil {
return nil, fmt.Errorf("failed to create operation blocker: %w", err)
}
graphqlPreHandler := NewPreHandler(&PreHandlerOptions{
Logger: s.logger,
Executor: executor,
Metrics: metrics,
OperationProcessor: operationProcessor,
Planner: operationPlanner,
AccessController: s.accessController,
OperationBlocker: operationBlocker,
RouterPublicKey: s.publicKey,
EnableRequestTracing: s.engineExecutionConfiguration.EnableRequestTracing,
DevelopmentMode: s.developmentMode,
TracerProvider: s.tracerProvider,
FlushTelemetryAfterResponse: s.awsLambda,
TraceExportVariables: s.traceConfig.ExportGraphQLVariables.Enabled,
FileUploadEnabled: s.fileUploadConfig.Enabled,
MaxUploadFiles: s.fileUploadConfig.MaxFiles,
MaxUploadFileSize: int(s.fileUploadConfig.MaxFileSizeBytes),
ComplexityLimits: s.securityConfiguration.ComplexityLimits,
AlwaysIncludeQueryPlan: s.engineExecutionConfiguration.Debug.AlwaysIncludeQueryPlan,
AlwaysSkipLoader: s.engineExecutionConfiguration.Debug.AlwaysSkipLoader,
QueryPlansEnabled: s.Config.queryPlansEnabled,
QueryPlansLoggingEnabled: s.engineExecutionConfiguration.Debug.PrintQueryPlans,
TrackSchemaUsageInfo: s.graphqlMetricsConfig.Enabled,
ClientHeader: s.clientHeader,
ComputeOperationSha256: computeSha256,
ApolloCompatibilityFlags: &s.apolloCompatibilityFlags,
DisableVariablesRemapping: s.engineExecutionConfiguration.DisableVariablesRemapping,
})
if s.webSocketConfiguration != nil && s.webSocketConfiguration.Enabled {
wsMiddleware := NewWebsocketMiddleware(ctx, WebsocketMiddlewareOptions{
OperationProcessor: operationProcessor,
OperationBlocker: operationBlocker,
Planner: operationPlanner,
GraphQLHandler: graphqlHandler,
PreHandler: graphqlPreHandler,
Metrics: metrics,
AccessController: s.accessController,
Logger: s.logger,
Stats: s.engineStats,
ReadTimeout: s.engineExecutionConfiguration.WebSocketClientReadTimeout,
EnableNetPoll: s.engineExecutionConfiguration.EnableNetPoll,
NetPollTimeout: s.engineExecutionConfiguration.WebSocketClientPollTimeout,
NetPollConnBufferSize: s.engineExecutionConfiguration.WebSocketClientConnBufferSize,
WebSocketConfiguration: s.webSocketConfiguration,
ClientHeader: s.clientHeader,
Attributes: baseOtelAttributes,
DisableVariablesRemapping: s.engineExecutionConfiguration.DisableVariablesRemapping,
ApolloCompatibilityFlags: s.apolloCompatibilityFlags,
})
// When the playground path is equal to the graphql path, we need to handle
// ws upgrades and html requests on the same route.
if s.playgroundConfig.Enabled && s.graphqlPath == s.playgroundConfig.Path {
httpRouter.Use(s.playgroundHandler, wsMiddleware)
} else {
httpRouter.Use(wsMiddleware)
}
}
httpRouter.Use(
// Responsible for handling regular GraphQL requests over HTTP not WebSockets
graphqlPreHandler.Handler,
// Must be mounted after the websocket middleware to ensure that we only count non-hijacked requests like WebSockets
func(handler http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
requestContext := getRequestContext(r.Context())
// We don't want to count any type of subscriptions e.g. SSE as in-flight requests because they are long-lived
if requestContext != nil && requestContext.operation != nil && requestContext.operation.opType != OperationTypeSubscription {
s.inFlightRequests.Add(1)
// Counting like this is safe because according to the go http.ServeHTTP documentation
// the requests is guaranteed to be finished when ServeHTTP returns
defer s.inFlightRequests.Sub(1)
}
handler.ServeHTTP(w, r)
})
})
// Mount built global and custom modules
// Needs to be mounted after the pre-handler to ensure that the request was parsed and authorized
httpRouter.Use(s.routerMiddlewares...)
// GraphQL over POST
httpRouter.Post(s.graphqlPath, graphqlHandler.ServeHTTP)
// GraphQL over GET
httpRouter.Get(s.graphqlPath, graphqlHandler.ServeHTTP)
gm.mux = httpRouter
s.graphMuxListLock.Lock()
defer s.graphMuxListLock.Unlock()
s.graphMuxList = append(s.graphMuxList, gm)
return gm, nil
}