router/core/plan_generator.go (203 lines of code) (raw):

package core import ( "context" "errors" "fmt" "net/http" "os" log "github.com/jensneuse/abstractlogger" nodev1 "github.com/wundergraph/cosmo/router/gen/proto/wg/cosmo/node/v1" "github.com/wundergraph/graphql-go-tools/v2/pkg/ast" "github.com/wundergraph/graphql-go-tools/v2/pkg/astnormalization" "github.com/wundergraph/graphql-go-tools/v2/pkg/astparser" "github.com/wundergraph/graphql-go-tools/v2/pkg/asttransform" "github.com/wundergraph/graphql-go-tools/v2/pkg/engine/datasource/graphql_datasource" "github.com/wundergraph/graphql-go-tools/v2/pkg/engine/datasource/introspection_datasource" "github.com/wundergraph/graphql-go-tools/v2/pkg/engine/datasource/pubsub_datasource" "github.com/wundergraph/graphql-go-tools/v2/pkg/engine/plan" "github.com/wundergraph/graphql-go-tools/v2/pkg/engine/postprocess" "github.com/wundergraph/graphql-go-tools/v2/pkg/engine/resolve" "github.com/wundergraph/graphql-go-tools/v2/pkg/operationreport" "go.uber.org/zap" "github.com/wundergraph/cosmo/router/pkg/config" "github.com/wundergraph/cosmo/router/pkg/execution_config" ) type PlanGenerator struct { planConfiguration *plan.Configuration definition *ast.Document } type Planner struct { planner *plan.Planner definition *ast.Document } func NewPlanner(planConfiguration *plan.Configuration, definition *ast.Document) (*Planner, error) { planner, err := plan.NewPlanner(*planConfiguration) if err != nil { return nil, fmt.Errorf("failed to create planner: %w", err) } return &Planner{ planner: planner, definition: definition, }, nil } func (pl *Planner) PlanOperation(operationFilePath string) (string, error) { operation, err := pl.parseOperation(operationFilePath) if err != nil { return "", fmt.Errorf("failed to parse operation: %w", err) } rawPlan, err := pl.planOperation(operation) if err != nil { return "", fmt.Errorf("failed to plan operation: %w", err) } return rawPlan.PrettyPrint(), nil } func (pl *Planner) PlanParsedOperation(operation *ast.Document) (*resolve.FetchTreeQueryPlanNode, error) { rawPlan, err := pl.planOperation(operation) if err != nil { return nil, fmt.Errorf("failed to plan operation: %w", err) } return rawPlan, nil } func (pl *Planner) planOperation(operation *ast.Document) (*resolve.FetchTreeQueryPlanNode, error) { report := operationreport.Report{} var operationName []byte for i := range operation.RootNodes { if operation.RootNodes[i].Kind == ast.NodeKindOperationDefinition { operationName = operation.OperationDefinitionNameBytes(operation.RootNodes[i].Ref) break } } if operationName == nil { return nil, errors.New("operation name not found") } astnormalization.NormalizeNamedOperation(operation, pl.definition, operationName, &report) // create and postprocess the plan preparedPlan := pl.planner.Plan(operation, pl.definition, string(operationName), &report, plan.IncludeQueryPlanInResponse()) if report.HasErrors() { return nil, errors.New(report.Error()) } post := postprocess.NewProcessor() post.Process(preparedPlan) if p, ok := preparedPlan.(*plan.SynchronousResponsePlan); ok { return p.Response.Fetches.QueryPlan(), nil } return &resolve.FetchTreeQueryPlanNode{}, nil } func (pl *Planner) parseOperation(operationFilePath string) (*ast.Document, error) { content, err := os.ReadFile(operationFilePath) if err != nil { return nil, err } doc, report := astparser.ParseGraphqlDocumentBytes(content) if report.HasErrors() { return nil, errors.New(report.Error()) } return &doc, nil } func NewPlanGenerator(configFilePath string, logger *zap.Logger, maxDataSourceCollectorsConcurrency uint) (*PlanGenerator, error) { pg := &PlanGenerator{} routerConfig, err := pg.buildRouterConfig(configFilePath) if err != nil { return nil, err } if err := pg.loadConfiguration( routerConfig, logger, maxDataSourceCollectorsConcurrency, ); err != nil { return nil, err } return pg, nil } func NewPlanGeneratorFromConfig(config *nodev1.RouterConfig, logger *zap.Logger, maxDataSourceCollectorsConcurrency uint) (*PlanGenerator, error) { pg := &PlanGenerator{} if err := pg.loadConfiguration(config, logger, maxDataSourceCollectorsConcurrency); err != nil { return nil, err } return pg, nil } func (pg *PlanGenerator) GetPlanner() (*Planner, error) { return NewPlanner(pg.planConfiguration, pg.definition) } func (pg *PlanGenerator) buildRouterConfig(configFilePath string) (*nodev1.RouterConfig, error) { routerConfig, err := execution_config.FromFile(configFilePath) if err != nil { return nil, err } return routerConfig, nil } func (pg *PlanGenerator) loadConfiguration(routerConfig *nodev1.RouterConfig, logger *zap.Logger, maxDataSourceCollectorsConcurrency uint) error { natSources := map[string]pubsub_datasource.NatsPubSub{} kafkaSources := map[string]pubsub_datasource.KafkaPubSub{} for _, ds := range routerConfig.GetEngineConfig().GetDatasourceConfigurations() { if ds.GetKind() != nodev1.DataSourceKind_PUBSUB || ds.GetCustomEvents() == nil { continue } for _, natConfig := range ds.GetCustomEvents().GetNats() { providerId := natConfig.GetEngineEventConfiguration().GetProviderId() if _, ok := natSources[providerId]; !ok { natSources[providerId] = nil } } for _, kafkaConfig := range ds.GetCustomEvents().GetKafka() { providerId := kafkaConfig.GetEngineEventConfiguration().GetProviderId() if _, ok := kafkaSources[providerId]; !ok { kafkaSources[providerId] = nil } } } pubSubFactory := pubsub_datasource.NewFactory(context.Background(), natSources, kafkaSources) var netPollConfig graphql_datasource.NetPollConfiguration netPollConfig.ApplyDefaults() subscriptionClient := graphql_datasource.NewGraphQLSubscriptionClient( http.DefaultClient, http.DefaultClient, context.Background(), graphql_datasource.WithLogger(log.NoopLogger), graphql_datasource.WithNetPollConfiguration(netPollConfig), ) loader := NewLoader(false, &DefaultFactoryResolver{ engineCtx: context.Background(), httpClient: http.DefaultClient, streamingClient: http.DefaultClient, subscriptionClient: subscriptionClient, transportOptions: &TransportOptions{SubgraphTransportOptions: NewSubgraphTransportOptions(config.TrafficShapingRules{})}, pubsub: pubSubFactory, }) // this generates the plan configuration using the data source factories from the config package planConfig, err := loader.Load(routerConfig.GetEngineConfig(), routerConfig.GetSubgraphs(), &RouterEngineConfiguration{}) if err != nil { return fmt.Errorf("failed to load configuration: %w", err) } planConfig.Debug = plan.DebugConfiguration{ PrintOperationTransformations: false, PrintOperationEnableASTRefs: false, PrintPlanningPaths: false, PrintQueryPlans: false, PrintNodeSuggestions: false, ConfigurationVisitor: false, PlanningVisitor: false, DatasourceVisitor: false, } planConfig.MaxDataSourceCollectorsConcurrency = maxDataSourceCollectorsConcurrency if logger != nil { planConfig.Logger = log.NewZapLogger(logger, log.DebugLevel) } // this is the GraphQL Schema that we will expose from our API definition, report := astparser.ParseGraphqlDocumentString(routerConfig.EngineConfig.GraphqlSchema) if report.HasErrors() { return fmt.Errorf("failed to parse graphql schema from engine config: %w", report) } // we need to merge the base schema, it contains the __schema and __type queries // these are not usually part of a regular GraphQL schema // the engine needs to have them defined, otherwise it cannot resolve such fields err = asttransform.MergeDefinitionWithBaseSchema(&definition) if err != nil { return fmt.Errorf("failed to merge graphql schema with base schema: %w", err) } // by default, the engine doesn't understand how to resolve the __schema and __type queries // we need to add a special datasource for that // it takes the definition as the input and generates introspection data // datasource is attached to Query.__schema, Query.__type, __Type.fields and __Type.enumValues fields introspectionFactory, err := introspection_datasource.NewIntrospectionConfigFactory(&definition) if err != nil { return fmt.Errorf("failed to create introspection config factory: %w", err) } dataSources := introspectionFactory.BuildDataSourceConfigurations() fieldConfigs := introspectionFactory.BuildFieldConfigurations() // we need to add these fields to the config // otherwise the engine wouldn't know how to resolve them planConfig.Fields = append(planConfig.Fields, fieldConfigs...) // finally, we add our data source for introspection to the existing data sources planConfig.DataSources = append(planConfig.DataSources, dataSources...) pg.planConfiguration = planConfig pg.definition = &definition return nil }