func newGraphServer()

in router/core/graph_server.go [117:376]


func newGraphServer(ctx context.Context, r *Router, routerConfig *nodev1.RouterConfig, proxy ProxyFunc) (*graphServer, error) {
	/* Older versions of composition will not populate a compatibility version.
	 * Currently, all "old" router execution configurations are compatible as there have been no breaking
	 * changes.
	 * Upon the first breaking change to the execution config, an unpopulated compatibility version will
	 * also be unsupported (and the logic for IsRouterCompatibleWithExecutionConfig will need to be updated).
	 */
	if !execution_config.IsRouterCompatibleWithExecutionConfig(r.logger, routerConfig.CompatibilityVersion) {
		return nil, fmt.Errorf(`the compatibility version "%s" is not compatible with this router version`, routerConfig.CompatibilityVersion)
	}

	isConnStoreEnabled := r.Config.metricConfig.OpenTelemetry.ConnectionStats || r.Config.metricConfig.Prometheus.ConnectionStats
	var traceDialer *TraceDialer
	if isConnStoreEnabled {
		traceDialer = NewTraceDialer()
	}

	// Base transport
	baseTransport := newHTTPTransport(r.subgraphTransportOptions.TransportRequestOptions, proxy, traceDialer, "")

	// Subgraph transports
	subgraphTransports := map[string]*http.Transport{}
	for subgraph, subgraphOpts := range r.subgraphTransportOptions.SubgraphMap {
		subgraphBaseTransport := newHTTPTransport(subgraphOpts, proxy, traceDialer, subgraph)
		subgraphTransports[subgraph] = subgraphBaseTransport
	}

	ctx, cancel := context.WithCancel(ctx)
	s := &graphServer{
		context:                 ctx,
		cancelFunc:              cancel,
		Config:                  &r.Config,
		engineStats:             r.EngineStats,
		baseTransport:           baseTransport,
		subgraphTransports:      subgraphTransports,
		playgroundHandler:       r.playgroundHandler,
		traceDialer:             traceDialer,
		baseRouterConfigVersion: routerConfig.GetVersion(),
		inFlightRequests:        &atomic.Uint64{},
		graphMuxList:            make([]*graphMux, 0, 1),
		instanceData: InstanceData{
			HostName:      r.hostName,
			ListenAddress: r.listenAddr,
		},
		storageProviders: &r.storageProviders,
	}

	baseOtelAttributes := []attribute.KeyValue{
		otel.WgRouterVersion.String(Version),
		otel.WgRouterClusterName.String(r.clusterName),
	}

	if s.graphApiToken != "" {
		claims, err := rjwt.ExtractFederatedGraphTokenClaims(s.graphApiToken)
		if err != nil {
			return nil, err
		}
		baseOtelAttributes = append(baseOtelAttributes, otel.WgFederatedGraphID.String(claims.FederatedGraphID))
	}

	s.baseOtelAttributes = baseOtelAttributes

	baseDefaultMuxAttributes := append([]attribute.KeyValue{otel.WgRouterConfigVersion.String(s.baseRouterConfigVersion)}, baseOtelAttributes...)

	mapper := newAttributeMapper(!rmetric.IsUsingDefaultCloudExporter(s.metricConfig), s.metricConfig.Attributes)
	mappedMetricAttributes := mapper.mapAttributes(baseDefaultMuxAttributes)

	if s.metricConfig.OpenTelemetry.RouterRuntime {
		// We track runtime metrics with base router config version
		s.runtimeMetrics = rmetric.NewRuntimeMetrics(
			s.logger,
			s.otlpMeterProvider,
			mappedMetricAttributes,
			s.processStartTime,
		)

		// Start runtime metrics
		if err := s.runtimeMetrics.Start(); err != nil {
			return nil, err
		}
	}

	if isConnStoreEnabled {
		connStore, err := rmetric.NewConnectionMetricStore(
			s.logger,
			nil,
			s.otlpMeterProvider,
			s.promMeterProvider,
			s.metricConfig,
			s.traceDialer.connectionPoolStats,
		)
		if err != nil {
			return nil, err
		}
		s.connectionMetrics = connStore
	}

	if err := s.setupEngineStatistics(mappedMetricAttributes); err != nil {
		return nil, fmt.Errorf("failed to setup engine statistics: %w", err)
	}

	if s.registrationInfo != nil {
		publicKey, err := jwt.ParseECPublicKeyFromPEM([]byte(s.registrationInfo.GetGraphPublicKey()))
		if err != nil {
			return nil, fmt.Errorf("failed to parse router public key: %w", err)
		}
		s.publicKey = publicKey
	}

	httpRouter := chi.NewRouter()

	/**
	* Middlewares
	 */

	// This recovery handler is used for everything before the graph mux to ensure that
	// we can recover from panics and log them properly.
	httpRouter.Use(recoveryhandler.New(recoveryhandler.WithLogHandler(func(w http.ResponseWriter, r *http.Request, err any) {
		s.logger.Error("[Recovery from panic]",
			zap.Any("error", err),
		)
	})))

	// Request traffic shaping related middlewares
	httpRouter.Use(rmiddleware.RequestSize(int64(s.routerTrafficConfig.MaxRequestBodyBytes)))
	if s.routerTrafficConfig.DecompressionEnabled {
		httpRouter.Use(rmiddleware.HandleCompression(s.logger))
	}

	httpRouter.Use(middleware.RequestID)
	httpRouter.Use(middleware.RealIP)
	if s.corsOptions.Enabled {
		httpRouter.Use(cors.New(*s.corsOptions))
	}

	if s.subgraphCircuitBreakerOptions.IsEnabled() {
		manager, err := circuit.NewManager(s.subgraphCircuitBreakerOptions.CircuitBreaker)
		if err != nil {
			return nil, err
		}
		s.circuitBreakerManager = manager
	}

	routingUrlGroupings, err := getRoutingUrlGroupingForCircuitBreakers(routerConfig, s.overrideRoutingURLConfiguration, s.overrides)
	if err != nil {
		return nil, err
	}

	gm, err := s.buildGraphMux(ctx, BuildGraphMuxOptions{
		RouterConfigVersion: s.baseRouterConfigVersion,
		EngineConfig:        routerConfig.GetEngineConfig(),
		ConfigSubgraphs:     routerConfig.GetSubgraphs(),
		RoutingUrlGroupings: routingUrlGroupings,
	})
	if err != nil {
		return nil, fmt.Errorf("failed to build base mux: %w", err)
	}

	featureFlagConfigMap := routerConfig.FeatureFlagConfigs.GetConfigByFeatureFlagName()
	if len(featureFlagConfigMap) > 0 {
		s.logger.Info("Feature flags enabled", zap.Strings("flags", maps.Keys(featureFlagConfigMap)))
	}

	multiGraphHandler, err := s.buildMultiGraphHandler(ctx, gm.mux, featureFlagConfigMap)
	if err != nil {
		return nil, fmt.Errorf("failed to build feature flag handler: %w", err)
	}

	wrapper, err := gzhttp.NewWrapper(
		gzhttp.MinSize(1024*4), // 4KB
		gzhttp.CompressionLevel(gzip.DefaultCompression),
		gzhttp.ContentTypes(CompressibleContentTypes),
	)
	if err != nil {
		return nil, fmt.Errorf("failed to create gzip wrapper: %w", err)
	}

	if s.traceConfig.Enabled {
		handler := rtrace.NewTracingHandler(rtrace.TracingHandlerOpts{
			TraceConfig:         s.traceConfig,
			HealthCheckPath:     s.healthCheckPath,
			ReadinessCheckPath:  s.readinessCheckPath,
			LivenessCheckPath:   s.livenessCheckPath,
			CompositePropagator: s.compositePropagator,
			TracerProvider:      s.tracerProvider,
			SpanNameFormatter:   SpanNameFormatter,
		})
		httpRouter.Use(handler)
	}

	if s.batchingConfig.Enabled {
		if s.batchingConfig.MaxConcurrentRoutines <= 0 {
			return nil, errors.New("maxConcurrent must be greater than 0")
		}
		if s.batchingConfig.MaxEntriesPerBatch <= 0 {
			return nil, errors.New("maxEntriesPerBatch must be greater than 0")
		}
	}

	/**
	* A group where we can selectively apply middlewares to the graphql endpoint
	 */
	httpRouter.Group(func(cr chi.Router) {
		// We are applying it conditionally because compressing 3MB playground is still slow even with stdlib gzip
		cr.Use(func(h http.Handler) http.Handler {
			return wrapper(h)
		})

		if s.headerRules != nil {
			cr.Use(rmiddleware.CookieWhitelist(s.headerRules.CookieWhitelist, []string{featureFlagCookie}))
		}

		// Mount the feature flag handler. It calls the base mux if no feature flag is set.
		if s.batchingConfig.Enabled {
			handler := Handler(
				HandlerOpts{
					MaxEntriesPerBatch: s.batchingConfig.MaxEntriesPerBatch,
					MaxRoutines:        s.batchingConfig.MaxConcurrentRoutines,
					OmitExtensions:     s.batchingConfig.OmitExtensions,
					HandlerSent:        multiGraphHandler,
					Tracer: r.tracerProvider.Tracer(
						"wundergraph/cosmo/router/internal/batch",
						oteltrace.WithInstrumentationVersion("0.0.1"),
					),
					Digest:              xxhash.New(),
					ClientHeader:        s.clientHeader,
					BaseOtelAttributes:  s.baseOtelAttributes,
					RouterConfigVersion: s.baseRouterConfigVersion,
					Logger:              s.logger,
				},
			)
			cr.Handle(r.graphqlPath, handler)
		} else {
			cr.Handle(r.graphqlPath, multiGraphHandler)
		}

		if r.webSocketConfiguration != nil && r.webSocketConfiguration.Enabled && r.webSocketConfiguration.AbsintheProtocol.Enabled {
			// Mount the Absinthe protocol handler for WebSockets
			httpRouter.Handle(r.webSocketConfiguration.AbsintheProtocol.HandlerPath, multiGraphHandler)
		}
	})

	/**
	* Routes
	 */

	// We mount the playground once here when we don't have a conflict with the websocket handler
	// If we have a conflict, we mount the playground during building the individual muxes
	if s.playgroundHandler != nil && s.graphqlPath != s.playgroundConfig.Path {
		httpRouter.Get(r.playgroundConfig.Path, s.playgroundHandler(nil).ServeHTTP)
	}

	httpRouter.Get(s.healthCheckPath, r.healthcheck.Liveness())
	httpRouter.Get(s.livenessCheckPath, r.healthcheck.Liveness())
	httpRouter.Get(s.readinessCheckPath, r.healthcheck.Readiness())

	s.mux = httpRouter

	return s, nil
}