router/core/supervisor_instance.go (284 lines of code) (raw):
package core
import (
"context"
"fmt"
"net/http"
"os"
"github.com/KimMachineGun/automemlimit/memlimit"
"github.com/dustin/go-humanize"
"github.com/wundergraph/cosmo/router/pkg/authentication"
"github.com/wundergraph/cosmo/router/pkg/config"
"github.com/wundergraph/cosmo/router/pkg/controlplane/selfregister"
"github.com/wundergraph/cosmo/router/pkg/cors"
"github.com/wundergraph/cosmo/router/pkg/logging"
"go.uber.org/automaxprocs/maxprocs"
"go.uber.org/zap"
)
// newRouter creates a new router instance.
//
// additionalOptions can be used to override default options or options provided in the config.
func newRouter(ctx context.Context, params RouterResources, additionalOptions ...Option) (*Router, error) {
cfg := params.Config
logger := params.Logger
// Automatically set GOMAXPROCS to avoid CPU throttling on containerized environments
_, err := maxprocs.Set(maxprocs.Logger(params.Logger.Sugar().Debugf))
if err != nil {
return nil, fmt.Errorf("could not set max GOMAXPROCS: %w", err)
}
if os.Getenv("GOMEMLIMIT") != "" {
params.Logger.Info("GOMEMLIMIT set by user", zap.String("limit", os.Getenv("GOMEMLIMIT")))
} else {
// Automatically set GOMEMLIMIT to 90% of the available memory.
// This is an effort to prevent the router from being killed by OOM (Out Of Memory)
// when the system is under memory pressure e.g. when GC is not able to free memory fast enough.
// More details: https://tip.golang.org/doc/gc-guide#Memory_limit
mLimit, err := memlimit.SetGoMemLimitWithOpts(
memlimit.WithRatio(0.9),
memlimit.WithProvider(memlimit.FromCgroupHybrid),
)
if err == nil {
params.Logger.Info("GOMEMLIMIT set automatically", zap.String("limit", humanize.Bytes(uint64(mLimit))))
} else if !params.Config.DevelopmentMode {
params.Logger.Warn("GOMEMLIMIT was not set. Please set it manually to around 90% of the available memory to prevent OOM kills", zap.Error(err))
}
}
options := optionsFromResources(logger, cfg)
options = append(options, additionalOptions...)
authenticators, err := setupAuthenticators(ctx, logger, cfg)
if err != nil {
return nil, fmt.Errorf("could not setup authenticators: %w", err)
}
if len(authenticators) > 0 {
options = append(options, WithAccessController(NewAccessController(authenticators, cfg.Authorization.RequireAuthentication)))
}
// HTTP_PROXY, HTTPS_PROXY and NO_PROXY
if hasProxyConfigured() {
options = append(options, WithProxy(http.ProxyFromEnvironment))
}
if cfg.AccessLogs.Enabled {
c := &AccessLogsConfig{
Attributes: cfg.AccessLogs.Router.Fields,
SubgraphEnabled: cfg.AccessLogs.Subgraphs.Enabled,
SubgraphAttributes: cfg.AccessLogs.Subgraphs.Fields,
}
if cfg.AccessLogs.Output.File.Enabled {
f, err := logging.NewLogFile(cfg.AccessLogs.Output.File.Path)
if err != nil {
return nil, fmt.Errorf("could not create log file: %w", err)
}
if cfg.AccessLogs.Buffer.Enabled {
bl, err := logging.NewJSONZapBufferedLogger(logging.BufferedLoggerOptions{
WS: f,
BufferSize: int(cfg.AccessLogs.Buffer.Size.Uint64()),
FlushInterval: cfg.AccessLogs.Buffer.FlushInterval,
Development: cfg.DevelopmentMode,
Level: zap.InfoLevel,
Pretty: !cfg.JSONLog,
})
if err != nil {
return nil, fmt.Errorf("could not create buffered logger: %w", err)
}
c.Logger = bl.Logger
} else {
c.Logger = logging.NewZapAccessLogger(f, cfg.DevelopmentMode, !cfg.JSONLog)
}
} else if cfg.AccessLogs.Output.Stdout.Enabled {
if cfg.AccessLogs.Buffer.Enabled {
bl, err := logging.NewJSONZapBufferedLogger(logging.BufferedLoggerOptions{
WS: os.Stdout,
BufferSize: int(cfg.AccessLogs.Buffer.Size.Uint64()),
FlushInterval: cfg.AccessLogs.Buffer.FlushInterval,
Development: cfg.DevelopmentMode,
Level: zap.InfoLevel,
Pretty: !cfg.JSONLog,
})
if err != nil {
return nil, fmt.Errorf("could not create buffered logger: %w", err)
}
c.Logger = bl.Logger
} else {
c.Logger = logging.NewZapAccessLogger(os.Stdout, cfg.DevelopmentMode, !cfg.JSONLog)
}
}
options = append(options, WithAccessLogs(c))
}
if cfg.RouterRegistration && cfg.Graph.Token != "" {
selfRegister, err := selfregister.New(cfg.ControlplaneURL, cfg.Graph.Token,
selfregister.WithLogger(logger),
)
if err != nil {
return nil, fmt.Errorf("could not create self register: %w", err)
}
options = append(options, WithSelfRegistration(selfRegister))
}
executionConfigPath := cfg.ExecutionConfig.File.Path
if executionConfigPath == "" {
executionConfigPath = cfg.RouterConfigPath
}
if executionConfigPath != "" {
options = append(options, WithExecutionConfig(&ExecutionConfig{
Watch: cfg.ExecutionConfig.File.Watch,
WatchInterval: cfg.ExecutionConfig.File.WatchInterval,
Path: executionConfigPath,
}))
} else {
options = append(options, WithConfigPollerConfig(&RouterConfigPollerConfig{
GraphSignKey: cfg.Graph.SignKey,
PollInterval: cfg.PollInterval,
PollJitter: cfg.PollJitter,
ExecutionConfig: cfg.ExecutionConfig,
}))
}
return NewRouter(options...)
}
func optionsFromResources(logger *zap.Logger, config *config.Config) []Option {
options := []Option{
WithListenerAddr(config.ListenAddr),
WithOverrideRoutingURL(config.OverrideRoutingURL),
WithOverrides(config.Overrides),
WithLogger(logger),
WithIntrospection(config.IntrospectionEnabled),
WithQueryPlans(config.QueryPlansEnabled),
WithPlayground(config.PlaygroundEnabled),
WithGraphApiToken(config.Graph.Token),
WithPersistedOperationsConfig(config.PersistedOperationsConfig),
WithAutomatedPersistedQueriesConfig(config.AutomaticPersistedQueries),
WithApolloCompatibilityFlagsConfig(config.ApolloCompatibilityFlags),
WithApolloRouterCompatibilityFlags(config.ApolloRouterCompatibilityFlags),
WithStorageProviders(config.StorageProviders),
WithGraphQLPath(config.GraphQLPath),
WithModulesConfig(config.Modules),
WithGracePeriod(config.GracePeriod),
WithPlaygroundConfig(config.PlaygroundConfig),
WithPlaygroundPath(config.PlaygroundPath),
WithHealthCheckPath(config.HealthCheckPath),
WithLivenessCheckPath(config.LivenessCheckPath),
WithGraphQLMetrics(&GraphQLMetricsConfig{
Enabled: config.GraphqlMetrics.Enabled,
CollectorEndpoint: config.GraphqlMetrics.CollectorEndpoint,
}),
WithAnonymization(&IPAnonymizationConfig{
Enabled: config.Compliance.AnonymizeIP.Enabled,
Method: IPAnonymizationMethod(config.Compliance.AnonymizeIP.Method),
}),
WithBatching(&BatchingConfig{
Enabled: config.Batching.Enabled,
MaxConcurrentRoutines: config.Batching.MaxConcurrency,
MaxEntriesPerBatch: config.Batching.MaxEntriesPerBatch,
OmitExtensions: config.Batching.OmitExtensions,
}),
WithClusterName(config.Cluster.Name),
WithInstanceID(config.InstanceID),
WithReadinessCheckPath(config.ReadinessCheckPath),
WithHeaderRules(config.Headers),
WithRouterTrafficConfig(&config.TrafficShaping.Router),
WithFileUploadConfig(&config.FileUpload),
WithSubgraphTransportOptions(NewSubgraphTransportOptions(config.TrafficShaping)),
WithSubgraphCircuitBreakerOptions(NewSubgraphCircuitBreakerOptions(config.TrafficShaping)),
WithSubgraphRetryOptions(
config.TrafficShaping.All.BackoffJitterRetry.Enabled,
config.TrafficShaping.All.BackoffJitterRetry.MaxAttempts,
config.TrafficShaping.All.BackoffJitterRetry.MaxDuration,
config.TrafficShaping.All.BackoffJitterRetry.Interval,
),
WithCors(&cors.Config{
Enabled: config.CORS.Enabled,
AllowOrigins: config.CORS.AllowOrigins,
AllowMethods: config.CORS.AllowMethods,
AllowCredentials: config.CORS.AllowCredentials,
AllowHeaders: config.CORS.AllowHeaders,
MaxAge: config.CORS.MaxAge,
}),
WithTLSConfig(&TlsConfig{
Enabled: config.TLS.Server.Enabled,
CertFile: config.TLS.Server.CertFile,
KeyFile: config.TLS.Server.KeyFile,
ClientAuth: &TlsClientAuthConfig{
CertFile: config.TLS.Server.ClientAuth.CertFile,
Required: config.TLS.Server.ClientAuth.Required,
},
}),
WithDevelopmentMode(config.DevelopmentMode),
WithTracing(TraceConfigFromTelemetry(&config.Telemetry)),
WithMetrics(MetricConfigFromTelemetry(&config.Telemetry)),
WithTelemetryAttributes(config.Telemetry.Attributes),
WithTracingAttributes(config.Telemetry.Tracing.Attributes),
WithEngineExecutionConfig(config.EngineExecutionConfiguration),
WithCacheControlPolicy(config.CacheControl),
WithSecurityConfig(config.SecurityConfiguration),
WithAuthorizationConfig(&config.Authorization),
WithWebSocketConfiguration(&config.WebSocket),
WithSubgraphErrorPropagation(config.SubgraphErrorPropagation),
WithLocalhostFallbackInsideDocker(config.LocalhostFallbackInsideDocker),
WithCDN(config.CDN),
WithEvents(config.Events),
WithRateLimitConfig(&config.RateLimit),
WithClientHeader(config.ClientHeader),
WithCacheWarmupConfig(&config.CacheWarmup),
WithMCP(config.MCP),
WithPlugins(config.Plugins),
WithDemoMode(config.DemoMode),
}
return options
}
func setupAuthenticators(ctx context.Context, logger *zap.Logger, cfg *config.Config) ([]authentication.Authenticator, error) {
jwtConf := cfg.Authentication.JWT
if len(jwtConf.JWKS) == 0 {
// No JWT authenticators configured
return nil, nil
}
var authenticators []authentication.Authenticator
configs := make([]authentication.JWKSConfig, 0, len(jwtConf.JWKS))
for _, jwks := range cfg.Authentication.JWT.JWKS {
configs = append(configs, authentication.JWKSConfig{
URL: jwks.URL,
RefreshInterval: jwks.RefreshInterval,
AllowedAlgorithms: jwks.Algorithms,
Secret: jwks.Secret,
Algorithm: jwks.Algorithm,
KeyId: jwks.KeyId,
})
}
tokenDecoder, err := authentication.NewJwksTokenDecoder(ctx, logger, configs)
if err != nil {
return nil, err
}
// create a map for the `httpHeaderAuthenticator`
headerSourceMap := map[string][]string{
jwtConf.HeaderName: {jwtConf.HeaderValuePrefix},
}
// The `websocketInitialPayloadAuthenticator` has one key and uses a flat list of prefixes
prefixSet := make(map[string]struct{})
for _, s := range jwtConf.HeaderSources {
if s.Type != "header" {
continue
}
for _, prefix := range s.ValuePrefixes {
headerSourceMap[s.Name] = append(headerSourceMap[s.Name], prefix)
prefixSet[prefix] = struct{}{}
}
}
opts := authentication.HttpHeaderAuthenticatorOptions{
Name: "jwks",
HeaderSourcePrefixes: headerSourceMap,
TokenDecoder: tokenDecoder,
}
authenticator, err := authentication.NewHttpHeaderAuthenticator(opts)
if err != nil {
logger.Error("Could not create HttpHeader authenticator", zap.Error(err))
return nil, err
}
authenticators = append(authenticators, authenticator)
if cfg.WebSocket.Authentication.FromInitialPayload.Enabled {
headerPrefixes := make([]string, 0, len(prefixSet))
for prefix := range prefixSet {
headerPrefixes = append(headerPrefixes, prefix)
}
opts := authentication.WebsocketInitialPayloadAuthenticatorOptions{
TokenDecoder: tokenDecoder,
Key: cfg.WebSocket.Authentication.FromInitialPayload.Key,
HeaderValuePrefixes: headerPrefixes,
}
authenticator, err = authentication.NewWebsocketInitialPayloadAuthenticator(opts)
if err != nil {
logger.Error("Could not create WebsocketInitialPayload authenticator", zap.Error(err))
return nil, err
}
authenticators = append(authenticators, authenticator)
}
return authenticators, nil
}
func hasProxyConfigured() bool {
_, httpProxy := os.LookupEnv("HTTP_PROXY")
_, httpsProxy := os.LookupEnv("HTTPS_PROXY")
_, noProxy := os.LookupEnv("NO_PROXY")
return httpProxy || httpsProxy || noProxy
}