in router/core/graph_server.go [117:376]
func newGraphServer(ctx context.Context, r *Router, routerConfig *nodev1.RouterConfig, proxy ProxyFunc) (*graphServer, error) {
/* Older versions of composition will not populate a compatibility version.
* Currently, all "old" router execution configurations are compatible as there have been no breaking
* changes.
* Upon the first breaking change to the execution config, an unpopulated compatibility version will
* also be unsupported (and the logic for IsRouterCompatibleWithExecutionConfig will need to be updated).
*/
if !execution_config.IsRouterCompatibleWithExecutionConfig(r.logger, routerConfig.CompatibilityVersion) {
return nil, fmt.Errorf(`the compatibility version "%s" is not compatible with this router version`, routerConfig.CompatibilityVersion)
}
isConnStoreEnabled := r.Config.metricConfig.OpenTelemetry.ConnectionStats || r.Config.metricConfig.Prometheus.ConnectionStats
var traceDialer *TraceDialer
if isConnStoreEnabled {
traceDialer = NewTraceDialer()
}
// Base transport
baseTransport := newHTTPTransport(r.subgraphTransportOptions.TransportRequestOptions, proxy, traceDialer, "")
// Subgraph transports
subgraphTransports := map[string]*http.Transport{}
for subgraph, subgraphOpts := range r.subgraphTransportOptions.SubgraphMap {
subgraphBaseTransport := newHTTPTransport(subgraphOpts, proxy, traceDialer, subgraph)
subgraphTransports[subgraph] = subgraphBaseTransport
}
ctx, cancel := context.WithCancel(ctx)
s := &graphServer{
context: ctx,
cancelFunc: cancel,
Config: &r.Config,
engineStats: r.EngineStats,
baseTransport: baseTransport,
subgraphTransports: subgraphTransports,
playgroundHandler: r.playgroundHandler,
traceDialer: traceDialer,
baseRouterConfigVersion: routerConfig.GetVersion(),
inFlightRequests: &atomic.Uint64{},
graphMuxList: make([]*graphMux, 0, 1),
instanceData: InstanceData{
HostName: r.hostName,
ListenAddress: r.listenAddr,
},
storageProviders: &r.storageProviders,
}
baseOtelAttributes := []attribute.KeyValue{
otel.WgRouterVersion.String(Version),
otel.WgRouterClusterName.String(r.clusterName),
}
if s.graphApiToken != "" {
claims, err := rjwt.ExtractFederatedGraphTokenClaims(s.graphApiToken)
if err != nil {
return nil, err
}
baseOtelAttributes = append(baseOtelAttributes, otel.WgFederatedGraphID.String(claims.FederatedGraphID))
}
s.baseOtelAttributes = baseOtelAttributes
baseDefaultMuxAttributes := append([]attribute.KeyValue{otel.WgRouterConfigVersion.String(s.baseRouterConfigVersion)}, baseOtelAttributes...)
mapper := newAttributeMapper(!rmetric.IsUsingDefaultCloudExporter(s.metricConfig), s.metricConfig.Attributes)
mappedMetricAttributes := mapper.mapAttributes(baseDefaultMuxAttributes)
if s.metricConfig.OpenTelemetry.RouterRuntime {
// We track runtime metrics with base router config version
s.runtimeMetrics = rmetric.NewRuntimeMetrics(
s.logger,
s.otlpMeterProvider,
mappedMetricAttributes,
s.processStartTime,
)
// Start runtime metrics
if err := s.runtimeMetrics.Start(); err != nil {
return nil, err
}
}
if isConnStoreEnabled {
connStore, err := rmetric.NewConnectionMetricStore(
s.logger,
nil,
s.otlpMeterProvider,
s.promMeterProvider,
s.metricConfig,
s.traceDialer.connectionPoolStats,
)
if err != nil {
return nil, err
}
s.connectionMetrics = connStore
}
if err := s.setupEngineStatistics(mappedMetricAttributes); err != nil {
return nil, fmt.Errorf("failed to setup engine statistics: %w", err)
}
if s.registrationInfo != nil {
publicKey, err := jwt.ParseECPublicKeyFromPEM([]byte(s.registrationInfo.GetGraphPublicKey()))
if err != nil {
return nil, fmt.Errorf("failed to parse router public key: %w", err)
}
s.publicKey = publicKey
}
httpRouter := chi.NewRouter()
/**
* Middlewares
*/
// This recovery handler is used for everything before the graph mux to ensure that
// we can recover from panics and log them properly.
httpRouter.Use(recoveryhandler.New(recoveryhandler.WithLogHandler(func(w http.ResponseWriter, r *http.Request, err any) {
s.logger.Error("[Recovery from panic]",
zap.Any("error", err),
)
})))
// Request traffic shaping related middlewares
httpRouter.Use(rmiddleware.RequestSize(int64(s.routerTrafficConfig.MaxRequestBodyBytes)))
if s.routerTrafficConfig.DecompressionEnabled {
httpRouter.Use(rmiddleware.HandleCompression(s.logger))
}
httpRouter.Use(middleware.RequestID)
httpRouter.Use(middleware.RealIP)
if s.corsOptions.Enabled {
httpRouter.Use(cors.New(*s.corsOptions))
}
if s.subgraphCircuitBreakerOptions.IsEnabled() {
manager, err := circuit.NewManager(s.subgraphCircuitBreakerOptions.CircuitBreaker)
if err != nil {
return nil, err
}
s.circuitBreakerManager = manager
}
routingUrlGroupings, err := getRoutingUrlGroupingForCircuitBreakers(routerConfig, s.overrideRoutingURLConfiguration, s.overrides)
if err != nil {
return nil, err
}
gm, err := s.buildGraphMux(ctx, BuildGraphMuxOptions{
RouterConfigVersion: s.baseRouterConfigVersion,
EngineConfig: routerConfig.GetEngineConfig(),
ConfigSubgraphs: routerConfig.GetSubgraphs(),
RoutingUrlGroupings: routingUrlGroupings,
})
if err != nil {
return nil, fmt.Errorf("failed to build base mux: %w", err)
}
featureFlagConfigMap := routerConfig.FeatureFlagConfigs.GetConfigByFeatureFlagName()
if len(featureFlagConfigMap) > 0 {
s.logger.Info("Feature flags enabled", zap.Strings("flags", maps.Keys(featureFlagConfigMap)))
}
multiGraphHandler, err := s.buildMultiGraphHandler(ctx, gm.mux, featureFlagConfigMap)
if err != nil {
return nil, fmt.Errorf("failed to build feature flag handler: %w", err)
}
wrapper, err := gzhttp.NewWrapper(
gzhttp.MinSize(1024*4), // 4KB
gzhttp.CompressionLevel(gzip.DefaultCompression),
gzhttp.ContentTypes(CompressibleContentTypes),
)
if err != nil {
return nil, fmt.Errorf("failed to create gzip wrapper: %w", err)
}
if s.traceConfig.Enabled {
handler := rtrace.NewTracingHandler(rtrace.TracingHandlerOpts{
TraceConfig: s.traceConfig,
HealthCheckPath: s.healthCheckPath,
ReadinessCheckPath: s.readinessCheckPath,
LivenessCheckPath: s.livenessCheckPath,
CompositePropagator: s.compositePropagator,
TracerProvider: s.tracerProvider,
SpanNameFormatter: SpanNameFormatter,
})
httpRouter.Use(handler)
}
if s.batchingConfig.Enabled {
if s.batchingConfig.MaxConcurrentRoutines <= 0 {
return nil, errors.New("maxConcurrent must be greater than 0")
}
if s.batchingConfig.MaxEntriesPerBatch <= 0 {
return nil, errors.New("maxEntriesPerBatch must be greater than 0")
}
}
/**
* A group where we can selectively apply middlewares to the graphql endpoint
*/
httpRouter.Group(func(cr chi.Router) {
// We are applying it conditionally because compressing 3MB playground is still slow even with stdlib gzip
cr.Use(func(h http.Handler) http.Handler {
return wrapper(h)
})
if s.headerRules != nil {
cr.Use(rmiddleware.CookieWhitelist(s.headerRules.CookieWhitelist, []string{featureFlagCookie}))
}
// Mount the feature flag handler. It calls the base mux if no feature flag is set.
if s.batchingConfig.Enabled {
handler := Handler(
HandlerOpts{
MaxEntriesPerBatch: s.batchingConfig.MaxEntriesPerBatch,
MaxRoutines: s.batchingConfig.MaxConcurrentRoutines,
OmitExtensions: s.batchingConfig.OmitExtensions,
HandlerSent: multiGraphHandler,
Tracer: r.tracerProvider.Tracer(
"wundergraph/cosmo/router/internal/batch",
oteltrace.WithInstrumentationVersion("0.0.1"),
),
Digest: xxhash.New(),
ClientHeader: s.clientHeader,
BaseOtelAttributes: s.baseOtelAttributes,
RouterConfigVersion: s.baseRouterConfigVersion,
Logger: s.logger,
},
)
cr.Handle(r.graphqlPath, handler)
} else {
cr.Handle(r.graphqlPath, multiGraphHandler)
}
if r.webSocketConfiguration != nil && r.webSocketConfiguration.Enabled && r.webSocketConfiguration.AbsintheProtocol.Enabled {
// Mount the Absinthe protocol handler for WebSockets
httpRouter.Handle(r.webSocketConfiguration.AbsintheProtocol.HandlerPath, multiGraphHandler)
}
})
/**
* Routes
*/
// We mount the playground once here when we don't have a conflict with the websocket handler
// If we have a conflict, we mount the playground during building the individual muxes
if s.playgroundHandler != nil && s.graphqlPath != s.playgroundConfig.Path {
httpRouter.Get(r.playgroundConfig.Path, s.playgroundHandler(nil).ServeHTTP)
}
httpRouter.Get(s.healthCheckPath, r.healthcheck.Liveness())
httpRouter.Get(s.livenessCheckPath, r.healthcheck.Liveness())
httpRouter.Get(s.readinessCheckPath, r.healthcheck.Readiness())
s.mux = httpRouter
return s, nil
}