in router/core/graph_server.go [104:268]
func newGraphServer(ctx context.Context, r *Router, routerConfig *nodev1.RouterConfig, proxy ProxyFunc) (*graphServer, error) {
/* Older versions of composition will not populate a compatibility version.
* Currently, all "old" router execution configurations are compatible as there have been no breaking
* changes.
* Upon the first breaking change to the execution config, an unpopulated compatibility version will
* also be unsupported (and the logic for IsRouterCompatibleWithExecutionConfig will need to be updated).
*/
if !execution_config.IsRouterCompatibleWithExecutionConfig(r.logger, routerConfig.CompatibilityVersion) {
return nil, fmt.Errorf(`the compatibility version "%s" is not compatible with this router version`, routerConfig.CompatibilityVersion)
}
ctx, cancel := context.WithCancel(ctx)
s := &graphServer{
context: ctx,
cancelFunc: cancel,
Config: &r.Config,
engineStats: r.EngineStats,
executionTransport: newHTTPTransport(r.subgraphTransportOptions.TransportRequestOptions, proxy),
executionTransportProxy: proxy,
playgroundHandler: r.playgroundHandler,
baseRouterConfigVersion: routerConfig.GetVersion(),
inFlightRequests: &atomic.Uint64{},
graphMuxList: make([]*graphMux, 0, 1),
routerListenAddr: r.listenAddr,
hostName: r.hostName,
pubSubProviders: &EnginePubSubProviders{
nats: map[string]pubsub_datasource.NatsPubSub{},
kafka: map[string]pubsub_datasource.KafkaPubSub{},
},
}
baseOtelAttributes := []attribute.KeyValue{
otel.WgRouterVersion.String(Version),
otel.WgRouterClusterName.String(r.clusterName),
}
if s.graphApiToken != "" {
claims, err := rjwt.ExtractFederatedGraphTokenClaims(s.graphApiToken)
if err != nil {
return nil, err
}
baseOtelAttributes = append(baseOtelAttributes, otel.WgFederatedGraphID.String(claims.FederatedGraphID))
}
s.baseOtelAttributes = baseOtelAttributes
if s.metricConfig.OpenTelemetry.RouterRuntime {
s.runtimeMetrics = rmetric.NewRuntimeMetrics(
s.logger,
s.otlpMeterProvider,
// We track runtime metrics with base router config version
append([]attribute.KeyValue{
otel.WgRouterConfigVersion.String(s.baseRouterConfigVersion),
}, baseOtelAttributes...),
s.processStartTime,
)
// Start runtime metrics
if err := s.runtimeMetrics.Start(); err != nil {
return nil, err
}
}
if err := s.setupEngineStatistics(); err != nil {
return nil, fmt.Errorf("failed to setup engine statistics: %w", err)
}
if s.registrationInfo != nil {
publicKey, err := jwt.ParseECPublicKeyFromPEM([]byte(s.registrationInfo.GetGraphPublicKey()))
if err != nil {
return nil, fmt.Errorf("failed to parse router public key: %w", err)
}
s.publicKey = publicKey
}
httpRouter := chi.NewRouter()
/**
* Middlewares
*/
// This recovery handler is used for everything before the graph mux to ensure that
// we can recover from panics and log them properly.
httpRouter.Use(recoveryhandler.New(recoveryhandler.WithLogHandler(func(w http.ResponseWriter, r *http.Request, err any) {
s.logger.Error("[Recovery from panic]",
zap.Any("error", err),
)
})))
// Request traffic shaping related middlewares
httpRouter.Use(rmiddleware.RequestSize(int64(s.routerTrafficConfig.MaxRequestBodyBytes)))
if s.routerTrafficConfig.DecompressionEnabled {
httpRouter.Use(rmiddleware.HandleCompression(s.logger))
}
httpRouter.Use(middleware.RequestID)
httpRouter.Use(middleware.RealIP)
if s.corsOptions.Enabled {
httpRouter.Use(cors.New(*s.corsOptions))
}
gm, err := s.buildGraphMux(ctx, "", s.baseRouterConfigVersion, routerConfig.GetEngineConfig(), routerConfig.GetSubgraphs())
if err != nil {
return nil, fmt.Errorf("failed to build base mux: %w", err)
}
featureFlagConfigMap := routerConfig.FeatureFlagConfigs.GetConfigByFeatureFlagName()
if len(featureFlagConfigMap) > 0 {
s.logger.Info("Feature flags enabled", zap.Strings("flags", maps.Keys(featureFlagConfigMap)))
}
multiGraphHandler, err := s.buildMultiGraphHandler(ctx, gm.mux, featureFlagConfigMap)
if err != nil {
return nil, fmt.Errorf("failed to build feature flag handler: %w", err)
}
wrapper, err := gzhttp.NewWrapper(
gzhttp.MinSize(1024*4), // 4KB
gzhttp.CompressionLevel(gzip.DefaultCompression),
gzhttp.ContentTypes(CompressibleContentTypes),
)
if err != nil {
return nil, fmt.Errorf("failed to create gzip wrapper: %w", err)
}
/**
* A group where we can selectively apply middlewares to the graphql endpoint
*/
httpRouter.Group(func(cr chi.Router) {
// We are applying it conditionally because compressing 3MB playground is still slow even with stdlib gzip
cr.Use(func(h http.Handler) http.Handler {
return wrapper(h)
})
if s.headerRules != nil {
cr.Use(rmiddleware.CookieWhitelist(s.headerRules.CookieWhitelist, []string{featureFlagCookie}))
}
// Mount the feature flag handler. It calls the base mux if no feature flag is set.
cr.Handle(r.graphqlPath, multiGraphHandler)
if r.webSocketConfiguration != nil && r.webSocketConfiguration.Enabled && r.webSocketConfiguration.AbsintheProtocol.Enabled {
// Mount the Absinthe protocol handler for WebSockets
httpRouter.Handle(r.webSocketConfiguration.AbsintheProtocol.HandlerPath, multiGraphHandler)
}
})
/**
* Routes
*/
// We mount the playground once here when we don't have a conflict with the websocket handler
// If we have a conflict, we mount the playground during building the individual muxes
if s.playgroundHandler != nil && s.graphqlPath != s.playgroundConfig.Path {
httpRouter.Get(r.playgroundConfig.Path, s.playgroundHandler(nil).ServeHTTP)
}
httpRouter.Get(s.healthCheckPath, r.healthcheck.Liveness())
httpRouter.Get(s.livenessCheckPath, r.healthcheck.Liveness())
httpRouter.Get(s.readinessCheckPath, r.healthcheck.Readiness())
s.mux = httpRouter
return s, nil
}