router/core/executor.go (184 lines of code) (raw):

package core import ( "context" "fmt" "net/http" "time" "go.uber.org/zap" "github.com/wundergraph/graphql-go-tools/v2/pkg/ast" "github.com/wundergraph/graphql-go-tools/v2/pkg/astparser" "github.com/wundergraph/graphql-go-tools/v2/pkg/asttransform" "github.com/wundergraph/graphql-go-tools/v2/pkg/engine/datasource/introspection_datasource" "github.com/wundergraph/graphql-go-tools/v2/pkg/engine/plan" "github.com/wundergraph/graphql-go-tools/v2/pkg/engine/resolve" "github.com/wundergraph/graphql-go-tools/v2/pkg/operationreport" nodev1 "github.com/wundergraph/cosmo/router/gen/proto/wg/cosmo/node/v1" "github.com/wundergraph/cosmo/router/pkg/config" "github.com/wundergraph/cosmo/router/pkg/grpcconnector" pubsub_datasource "github.com/wundergraph/cosmo/router/pkg/pubsub/datasource" ) type ExecutorConfigurationBuilder struct { introspection bool trackUsageInfo bool baseURL string logger *zap.Logger transportOptions *TransportOptions baseTripper http.RoundTripper subgraphTrippers map[string]http.RoundTripper pluginHost *grpcconnector.Connector subscriptionClientOptions *SubscriptionClientOptions instanceData InstanceData } type Executor struct { PlanConfig plan.Configuration // ClientSchema is the GraphQL Schema that is exposed from our API // it is used for the introspection and query normalization/validation. ClientSchema *ast.Document // RouterSchema the GraphQL Schema that we use for planning the queries RouterSchema *ast.Document Resolver *resolve.Resolver RenameTypeNames []resolve.RenameTypeName TrackUsageInfo bool } type ExecutorBuildOptions struct { EngineConfig *nodev1.EngineConfiguration Subgraphs []*nodev1.Subgraph RouterEngineConfig *RouterEngineConfiguration Reporter resolve.Reporter ApolloCompatibilityFlags config.ApolloCompatibilityFlags ApolloRouterCompatibilityFlags config.ApolloRouterCompatibilityFlags HeartbeatInterval time.Duration TraceClientRequired bool PluginsEnabled bool InstanceData InstanceData } func (b *ExecutorConfigurationBuilder) Build(ctx context.Context, opts *ExecutorBuildOptions) (*Executor, []pubsub_datasource.Provider, error) { planConfig, providers, err := b.buildPlannerConfiguration(ctx, opts.EngineConfig, opts.Subgraphs, opts.RouterEngineConfig, opts.PluginsEnabled) if err != nil { return nil, nil, fmt.Errorf("failed to build planner configuration: %w", err) } options := resolve.ResolverOptions{ MaxConcurrency: opts.RouterEngineConfig.Execution.MaxConcurrentResolvers, Debug: opts.RouterEngineConfig.Execution.Debug.EnableResolverDebugging, Reporter: opts.Reporter, PropagateSubgraphErrors: opts.RouterEngineConfig.SubgraphErrorPropagation.Enabled, PropagateSubgraphStatusCodes: opts.RouterEngineConfig.SubgraphErrorPropagation.PropagateStatusCodes, RewriteSubgraphErrorPaths: opts.RouterEngineConfig.SubgraphErrorPropagation.RewritePaths, OmitSubgraphErrorLocations: opts.RouterEngineConfig.SubgraphErrorPropagation.OmitLocations, OmitSubgraphErrorExtensions: opts.RouterEngineConfig.SubgraphErrorPropagation.OmitExtensions, AllowedErrorExtensionFields: opts.RouterEngineConfig.SubgraphErrorPropagation.AllowedExtensionFields, AttachServiceNameToErrorExtensions: opts.RouterEngineConfig.SubgraphErrorPropagation.AttachServiceName, DefaultErrorExtensionCode: opts.RouterEngineConfig.SubgraphErrorPropagation.DefaultExtensionCode, AllowedSubgraphErrorFields: opts.RouterEngineConfig.SubgraphErrorPropagation.AllowedFields, AllowAllErrorExtensionFields: opts.RouterEngineConfig.SubgraphErrorPropagation.AllowAllExtensionFields, MaxRecyclableParserSize: opts.RouterEngineConfig.Execution.ResolverMaxRecyclableParserSize, MultipartSubHeartbeatInterval: opts.HeartbeatInterval, MaxSubscriptionFetchTimeout: opts.RouterEngineConfig.Execution.SubscriptionFetchTimeout, } if opts.ApolloCompatibilityFlags.ValueCompletion.Enabled { options.ResolvableOptions.ApolloCompatibilityValueCompletionInExtensions = true } if opts.ApolloCompatibilityFlags.TruncateFloats.Enabled { options.ResolvableOptions.ApolloCompatibilityTruncateFloatValues = true } if opts.ApolloCompatibilityFlags.SuppressFetchErrors.Enabled { options.ResolvableOptions.ApolloCompatibilitySuppressFetchErrors = true } if opts.ApolloCompatibilityFlags.ReplaceInvalidVarErrors.Enabled { options.ResolvableOptions.ApolloCompatibilityReplaceInvalidVarError = true } if opts.ApolloRouterCompatibilityFlags.SubrequestHTTPError.Enabled { options.ApolloRouterCompatibilitySubrequestHTTPError = true } switch opts.RouterEngineConfig.SubgraphErrorPropagation.Mode { case config.SubgraphErrorPropagationModePassthrough: options.SubgraphErrorPropagationMode = resolve.SubgraphErrorPropagationModePassThrough case config.SubgraphErrorPropagationModeWrapped: options.SubgraphErrorPropagationMode = resolve.SubgraphErrorPropagationModeWrapped default: options.SubgraphErrorPropagationMode = resolve.SubgraphErrorPropagationModeWrapped } // this is the resolver, it's stateful and manages all the client connections, etc... resolver := resolve.New(ctx, options) var ( // clientSchemaDefinition is the GraphQL Schema that is exposed from our API // it should be used for the introspection and query normalization/validation. clientSchemaDefinition *ast.Document // routerSchemaDefinition the GraphQL Schema that we use for planning the queries routerSchemaDefinition ast.Document report operationreport.Report ) routerSchemaDefinition, report = astparser.ParseGraphqlDocumentString(opts.EngineConfig.GraphqlSchema) if report.HasErrors() { return nil, providers, fmt.Errorf("failed to parse graphql schema from engine config: %w", report) } // we need to merge the base schema, it contains the __schema and __type queries, // as well as built-in scalars like Int, String, etc... // these are usually not part of a regular GraphQL schema // the engine needs to have them defined, otherwise it cannot resolve such fields err = asttransform.MergeDefinitionWithBaseSchema(&routerSchemaDefinition) if err != nil { return nil, providers, fmt.Errorf("failed to merge graphql schema with base schema: %w", err) } if clientSchemaStr := opts.EngineConfig.GetGraphqlClientSchema(); clientSchemaStr != "" { // The client schema is a subset of the router schema that does not include @inaccessible fields. // The client schema only exists if the federated schema includes @inaccessible directives or @tag directives clientSchema, report := astparser.ParseGraphqlDocumentString(clientSchemaStr) if report.HasErrors() { return nil, providers, fmt.Errorf("failed to parse graphql client schema from engine config: %w", report) } err = asttransform.MergeDefinitionWithBaseSchema(&clientSchema) if err != nil { return nil, providers, fmt.Errorf("failed to merge graphql client schema with base schema: %w", err) } clientSchemaDefinition = &clientSchema } else { // In the event that a client schema is not generated, the router schema is used in place of the client schema (e.g., for operation validation) clientSchemaDefinition = &routerSchemaDefinition } if b.introspection { // by default, the engine doesn't understand how to resolve the __schema and __type queries // we need to add a special datasource for that // it takes the definition as the input and generates introspection data // datasource is attached to Query.__schema, Query.__type, __Type.fields and __Type.enumValues fields introspectionFactory, err := introspection_datasource.NewIntrospectionConfigFactory(clientSchemaDefinition) if err != nil { return nil, providers, fmt.Errorf("failed to create introspection config factory: %w", err) } fieldConfigs := introspectionFactory.BuildFieldConfigurations() // we need to add these fields to the config // otherwise the engine wouldn't know how to resolve them planConfig.Fields = append(planConfig.Fields, fieldConfigs...) dataSources := introspectionFactory.BuildDataSourceConfigurations() // finally, we add our data source for introspection to the existing data sources planConfig.DataSources = append(planConfig.DataSources, dataSources...) } var renameTypeNames []resolve.RenameTypeName // when applying namespacing, it's possible that we need to rename types // for that, we have to map the rename types config to the engine's rename type names for _, configuration := range planConfig.Types { if configuration.RenameTo != "" { renameTypeNames = append(renameTypeNames, resolve.RenameTypeName{ From: []byte(configuration.RenameTo), To: []byte(configuration.TypeName), }) } } return &Executor{ PlanConfig: *planConfig, ClientSchema: clientSchemaDefinition, RouterSchema: &routerSchemaDefinition, Resolver: resolver, RenameTypeNames: renameTypeNames, TrackUsageInfo: b.trackUsageInfo, }, providers, nil } func (b *ExecutorConfigurationBuilder) buildPlannerConfiguration(ctx context.Context, engineConfig *nodev1.EngineConfiguration, subgraphs []*nodev1.Subgraph, routerEngineCfg *RouterEngineConfiguration, pluginsEnabled bool) (*plan.Configuration, []pubsub_datasource.Provider, error) { // this loader is used to take the engine config and create a plan config // the plan config is what the engine uses to turn a GraphQL Request into an execution plan // the plan config is stateful as it carries connection pools and other things loader := NewLoader(ctx, b.trackUsageInfo, NewDefaultFactoryResolver( ctx, b.transportOptions, b.subscriptionClientOptions, b.baseTripper, b.subgraphTrippers, b.pluginHost, b.logger, routerEngineCfg.Execution.EnableSingleFlight, routerEngineCfg.Execution.EnableNetPoll, b.instanceData, ), b.logger) // this generates the plan config using the data source factories from the config package planConfig, providers, err := loader.Load(engineConfig, subgraphs, routerEngineCfg, pluginsEnabled) if err != nil { return nil, nil, fmt.Errorf("failed to load configuration: %w", err) } debug := &routerEngineCfg.Execution.Debug planConfig.Debug = plan.DebugConfiguration{ PrintOperationTransformations: debug.PrintOperationTransformations, PrintOperationEnableASTRefs: debug.PrintOperationEnableASTRefs, PrintPlanningPaths: debug.PrintPlanningPaths, PrintQueryPlans: debug.PrintIntermediateQueryPlans, PrintNodeSuggestions: debug.PrintNodeSuggestions, ConfigurationVisitor: debug.ConfigurationVisitor, PlanningVisitor: debug.PlanningVisitor, DatasourceVisitor: debug.DatasourceVisitor, } planConfig.MinifySubgraphOperations = routerEngineCfg.Execution.MinifySubgraphOperations planConfig.EnableOperationNamePropagation = routerEngineCfg.Execution.EnableSubgraphFetchOperationName return planConfig, providers, nil }