router/core/executor.go (241 lines of code) (raw):
package core
import (
"context"
"crypto/tls"
"errors"
"fmt"
"net/http"
"time"
"github.com/nats-io/nats.go"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/sasl/plain"
"go.uber.org/zap"
"github.com/wundergraph/graphql-go-tools/v2/pkg/ast"
"github.com/wundergraph/graphql-go-tools/v2/pkg/astparser"
"github.com/wundergraph/graphql-go-tools/v2/pkg/asttransform"
"github.com/wundergraph/graphql-go-tools/v2/pkg/engine/datasource/introspection_datasource"
"github.com/wundergraph/graphql-go-tools/v2/pkg/engine/plan"
"github.com/wundergraph/graphql-go-tools/v2/pkg/engine/resolve"
"github.com/wundergraph/graphql-go-tools/v2/pkg/operationreport"
nodev1 "github.com/wundergraph/cosmo/router/gen/proto/wg/cosmo/node/v1"
"github.com/wundergraph/cosmo/router/pkg/config"
)
type ExecutorConfigurationBuilder struct {
introspection bool
trackUsageInfo bool
baseURL string
transport http.RoundTripper
logger *zap.Logger
transportOptions *TransportOptions
}
type Executor struct {
PlanConfig plan.Configuration
// ClientSchema is the GraphQL Schema that is exposed from our API
// it is used for the introspection and query normalization/validation.
ClientSchema *ast.Document
// RouterSchema the GraphQL Schema that we use for planning the queries
RouterSchema *ast.Document
Resolver *resolve.Resolver
RenameTypeNames []resolve.RenameTypeName
TrackUsageInfo bool
}
type ExecutorBuildOptions struct {
EngineConfig *nodev1.EngineConfiguration
Subgraphs []*nodev1.Subgraph
RouterEngineConfig *RouterEngineConfiguration
PubSubProviders *EnginePubSubProviders
Reporter resolve.Reporter
ApolloCompatibilityFlags config.ApolloCompatibilityFlags
ApolloRouterCompatibilityFlags config.ApolloRouterCompatibilityFlags
HeartbeatInterval time.Duration
}
func (b *ExecutorConfigurationBuilder) Build(ctx context.Context, opts *ExecutorBuildOptions) (*Executor, error) {
planConfig, err := b.buildPlannerConfiguration(ctx, opts.EngineConfig, opts.Subgraphs, opts.RouterEngineConfig, opts.PubSubProviders)
if err != nil {
return nil, fmt.Errorf("failed to build planner configuration: %w", err)
}
options := resolve.ResolverOptions{
MaxConcurrency: opts.RouterEngineConfig.Execution.MaxConcurrentResolvers,
Debug: opts.RouterEngineConfig.Execution.Debug.EnableResolverDebugging,
Reporter: opts.Reporter,
PropagateSubgraphErrors: opts.RouterEngineConfig.SubgraphErrorPropagation.Enabled,
PropagateSubgraphStatusCodes: opts.RouterEngineConfig.SubgraphErrorPropagation.PropagateStatusCodes,
RewriteSubgraphErrorPaths: opts.RouterEngineConfig.SubgraphErrorPropagation.RewritePaths,
OmitSubgraphErrorLocations: opts.RouterEngineConfig.SubgraphErrorPropagation.OmitLocations,
OmitSubgraphErrorExtensions: opts.RouterEngineConfig.SubgraphErrorPropagation.OmitExtensions,
AllowedErrorExtensionFields: opts.RouterEngineConfig.SubgraphErrorPropagation.AllowedExtensionFields,
AttachServiceNameToErrorExtensions: opts.RouterEngineConfig.SubgraphErrorPropagation.AttachServiceName,
DefaultErrorExtensionCode: opts.RouterEngineConfig.SubgraphErrorPropagation.DefaultExtensionCode,
AllowedSubgraphErrorFields: opts.RouterEngineConfig.SubgraphErrorPropagation.AllowedFields,
MaxRecyclableParserSize: opts.RouterEngineConfig.Execution.ResolverMaxRecyclableParserSize,
MultipartSubHeartbeatInterval: opts.HeartbeatInterval,
MaxSubscriptionFetchTimeout: opts.RouterEngineConfig.Execution.SubscriptionFetchTimeout,
}
if opts.ApolloCompatibilityFlags.ValueCompletion.Enabled {
options.ResolvableOptions.ApolloCompatibilityValueCompletionInExtensions = true
}
if opts.ApolloCompatibilityFlags.TruncateFloats.Enabled {
options.ResolvableOptions.ApolloCompatibilityTruncateFloatValues = true
}
if opts.ApolloCompatibilityFlags.SuppressFetchErrors.Enabled {
options.ResolvableOptions.ApolloCompatibilitySuppressFetchErrors = true
}
if opts.ApolloCompatibilityFlags.ReplaceUndefinedOpFieldErrors.Enabled {
options.ResolvableOptions.ApolloCompatibilityReplaceUndefinedOpFieldError = true
}
if opts.ApolloCompatibilityFlags.ReplaceInvalidVarErrors.Enabled {
options.ResolvableOptions.ApolloCompatibilityReplaceInvalidVarError = true
}
if opts.ApolloRouterCompatibilityFlags.SubrequestHTTPError.Enabled {
options.ResolvableOptions.ApolloRouterCompatibilitySubrequestHTTPError = true
}
switch opts.RouterEngineConfig.SubgraphErrorPropagation.Mode {
case config.SubgraphErrorPropagationModePassthrough:
options.SubgraphErrorPropagationMode = resolve.SubgraphErrorPropagationModePassThrough
case config.SubgraphErrorPropagationModeWrapped:
options.SubgraphErrorPropagationMode = resolve.SubgraphErrorPropagationModeWrapped
default:
options.SubgraphErrorPropagationMode = resolve.SubgraphErrorPropagationModeWrapped
}
// this is the resolver, it's stateful and manages all the client connections, etc...
resolver := resolve.New(ctx, options)
var (
// clientSchemaDefinition is the GraphQL Schema that is exposed from our API
// it should be used for the introspection and query normalization/validation.
clientSchemaDefinition *ast.Document
// routerSchemaDefinition the GraphQL Schema that we use for planning the queries
routerSchemaDefinition ast.Document
report operationreport.Report
)
routerSchemaDefinition, report = astparser.ParseGraphqlDocumentString(opts.EngineConfig.GraphqlSchema)
if report.HasErrors() {
return nil, fmt.Errorf("failed to parse graphql schema from engine config: %w", report)
}
// we need to merge the base schema, it contains the __schema and __type queries,
// as well as built-in scalars like Int, String, etc...
// these are usually not part of a regular GraphQL schema
// the engine needs to have them defined, otherwise it cannot resolve such fields
err = asttransform.MergeDefinitionWithBaseSchema(&routerSchemaDefinition)
if err != nil {
return nil, fmt.Errorf("failed to merge graphql schema with base schema: %w", err)
}
if clientSchemaStr := opts.EngineConfig.GetGraphqlClientSchema(); clientSchemaStr != "" {
// The client schema is a subset of the router schema that does not include @inaccessible fields.
// The client schema only exists if the federated schema includes @inaccessible directives or @tag directives
clientSchema, report := astparser.ParseGraphqlDocumentString(clientSchemaStr)
if report.HasErrors() {
return nil, fmt.Errorf("failed to parse graphql client schema from engine config: %w", report)
}
err = asttransform.MergeDefinitionWithBaseSchema(&clientSchema)
if err != nil {
return nil, fmt.Errorf("failed to merge graphql client schema with base schema: %w", err)
}
clientSchemaDefinition = &clientSchema
} else {
// In the event that a client schema is not generated, the router schema is used in place of the client schema (e.g., for operation validation)
clientSchemaDefinition = &routerSchemaDefinition
}
if b.introspection {
// by default, the engine doesn't understand how to resolve the __schema and __type queries
// we need to add a special datasource for that
// it takes the definition as the input and generates introspection data
// datasource is attached to Query.__schema, Query.__type, __Type.fields and __Type.enumValues fields
introspectionFactory, err := introspection_datasource.NewIntrospectionConfigFactory(clientSchemaDefinition)
if err != nil {
return nil, fmt.Errorf("failed to create introspection config factory: %w", err)
}
fieldConfigs := introspectionFactory.BuildFieldConfigurations()
// we need to add these fields to the config
// otherwise the engine wouldn't know how to resolve them
planConfig.Fields = append(planConfig.Fields, fieldConfigs...)
dataSources := introspectionFactory.BuildDataSourceConfigurations()
// finally, we add our data source for introspection to the existing data sources
planConfig.DataSources = append(planConfig.DataSources, dataSources...)
}
var renameTypeNames []resolve.RenameTypeName
// when applying namespacing, it's possible that we need to rename types
// for that, we have to map the rename types config to the engine's rename type names
for _, configuration := range planConfig.Types {
if configuration.RenameTo != "" {
renameTypeNames = append(renameTypeNames, resolve.RenameTypeName{
From: []byte(configuration.RenameTo),
To: []byte(configuration.TypeName),
})
}
}
return &Executor{
PlanConfig: *planConfig,
ClientSchema: clientSchemaDefinition,
RouterSchema: &routerSchemaDefinition,
Resolver: resolver,
RenameTypeNames: renameTypeNames,
TrackUsageInfo: b.trackUsageInfo,
}, nil
}
func buildNatsOptions(eventSource config.NatsEventSource, logger *zap.Logger) ([]nats.Option, error) {
opts := []nats.Option{
nats.Name(fmt.Sprintf("cosmo.router.edfs.nats.%s", eventSource.ID)),
nats.ReconnectJitter(500*time.Millisecond, 2*time.Second),
nats.ClosedHandler(func(conn *nats.Conn) {
logger.Info("NATS connection closed", zap.String("provider_id", eventSource.ID), zap.Error(conn.LastError()))
}),
nats.ConnectHandler(func(nc *nats.Conn) {
logger.Info("NATS connection established", zap.String("provider_id", eventSource.ID), zap.String("url", nc.ConnectedUrlRedacted()))
}),
nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
if err != nil {
logger.Error("NATS disconnected; will attempt to reconnect", zap.Error(err), zap.String("provider_id", eventSource.ID))
} else {
logger.Info("NATS disconnected", zap.String("provider_id", eventSource.ID))
}
}),
nats.ErrorHandler(func(conn *nats.Conn, subscription *nats.Subscription, err error) {
if errors.Is(err, nats.ErrSlowConsumer) {
logger.Warn(
"NATS slow consumer detected. Events are being dropped. Please consider increasing the buffer size or reducing the number of messages being sent.",
zap.Error(err),
zap.String("provider_id", eventSource.ID),
)
} else {
logger.Error("NATS error", zap.Error(err))
}
}),
nats.ReconnectHandler(func(conn *nats.Conn) {
logger.Info("NATS reconnected", zap.String("provider_id", eventSource.ID), zap.String("url", conn.ConnectedUrlRedacted()))
}),
}
if eventSource.Authentication != nil {
if eventSource.Authentication.Token != nil {
opts = append(opts, nats.Token(*eventSource.Authentication.Token))
} else if eventSource.Authentication.UserInfo.Username != nil && eventSource.Authentication.UserInfo.Password != nil {
opts = append(opts, nats.UserInfo(*eventSource.Authentication.UserInfo.Username, *eventSource.Authentication.UserInfo.Password))
}
}
return opts, nil
}
// buildKafkaOptions creates a list of kgo.Opt options for the given Kafka event source configuration.
// Only general options like TLS, SASL, etc. are configured here. Specific options like topics, etc. are
// configured in the KafkaPubSub implementation.
func buildKafkaOptions(eventSource config.KafkaEventSource) ([]kgo.Opt, error) {
opts := []kgo.Opt{
kgo.SeedBrokers(eventSource.Brokers...),
// Ensure proper timeouts are set
kgo.ProduceRequestTimeout(10 * time.Second),
kgo.ConnIdleTimeout(60 * time.Second),
}
if eventSource.TLS != nil && eventSource.TLS.Enabled {
opts = append(opts,
// Configure TLS. Uses SystemCertPool for RootCAs by default.
kgo.DialTLSConfig(new(tls.Config)),
)
}
if eventSource.Authentication != nil && eventSource.Authentication.SASLPlain.Username != nil && eventSource.Authentication.SASLPlain.Password != nil {
opts = append(opts, kgo.SASL(plain.Auth{
User: *eventSource.Authentication.SASLPlain.Username,
Pass: *eventSource.Authentication.SASLPlain.Password,
}.AsMechanism()))
}
return opts, nil
}
func (b *ExecutorConfigurationBuilder) buildPlannerConfiguration(ctx context.Context, engineConfig *nodev1.EngineConfiguration, subgraphs []*nodev1.Subgraph, routerEngineCfg *RouterEngineConfiguration, pubSubProviders *EnginePubSubProviders) (*plan.Configuration, error) {
// this loader is used to take the engine config and create a plan config
// the plan config is what the engine uses to turn a GraphQL Request into an execution plan
// the plan config is stateful as it carries connection pools and other things
loader := NewLoader(b.trackUsageInfo, NewDefaultFactoryResolver(
ctx,
b.transportOptions,
b.transport,
b.logger,
routerEngineCfg.Execution.EnableSingleFlight,
routerEngineCfg.Execution.EnableNetPoll,
pubSubProviders.nats,
pubSubProviders.kafka,
))
// this generates the plan config using the data source factories from the config package
planConfig, err := loader.Load(engineConfig, subgraphs, routerEngineCfg)
if err != nil {
return nil, fmt.Errorf("failed to load configuration: %w", err)
}
debug := &routerEngineCfg.Execution.Debug
planConfig.Debug = plan.DebugConfiguration{
PrintOperationTransformations: debug.PrintOperationTransformations,
PrintOperationEnableASTRefs: debug.PrintOperationEnableASTRefs,
PrintPlanningPaths: debug.PrintPlanningPaths,
PrintQueryPlans: debug.PrintIntermediateQueryPlans,
PrintNodeSuggestions: debug.PrintNodeSuggestions,
ConfigurationVisitor: debug.ConfigurationVisitor,
PlanningVisitor: debug.PlanningVisitor,
DatasourceVisitor: debug.DatasourceVisitor,
}
planConfig.MinifySubgraphOperations = routerEngineCfg.Execution.MinifySubgraphOperations
planConfig.EnableOperationNamePropagation = routerEngineCfg.Execution.EnableSubgraphFetchOperationName
return planConfig, nil
}