router/core/operation_planner.go (130 lines of code) (raw):

package core import ( "errors" "strconv" "golang.org/x/sync/singleflight" "github.com/wundergraph/graphql-go-tools/v2/pkg/ast" "github.com/wundergraph/graphql-go-tools/v2/pkg/astparser" "github.com/wundergraph/graphql-go-tools/v2/pkg/engine/plan" "github.com/wundergraph/graphql-go-tools/v2/pkg/engine/postprocess" "github.com/wundergraph/graphql-go-tools/v2/pkg/engine/resolve" graphqlmetricsv1 "github.com/wundergraph/cosmo/router/gen/proto/wg/cosmo/graphqlmetrics/v1" "github.com/wundergraph/cosmo/router/pkg/graphqlschemausage" ) type planWithMetaData struct { preparedPlan plan.Plan operationDocument, schemaDocument *ast.Document typeFieldUsageInfo []*graphqlmetricsv1.TypeFieldUsageInfo argumentUsageInfo []*graphqlmetricsv1.ArgumentUsageInfo } type OperationPlanner struct { sf singleflight.Group planCache ExecutionPlanCache[uint64, *planWithMetaData] executor *Executor trackUsageInfo bool } type ExecutionPlanCache[K any, V any] interface { // Get the value from the cache Get(key K) (V, bool) // Set the value in the cache with a cost. The cost depends on the cache implementation Set(key K, value V, cost int64) bool // Close the cache and free resources Close() } func NewOperationPlanner(executor *Executor, planCache ExecutionPlanCache[uint64, *planWithMetaData]) *OperationPlanner { return &OperationPlanner{ planCache: planCache, executor: executor, trackUsageInfo: executor.TrackUsageInfo, } } func (p *OperationPlanner) preparePlan(ctx *operationContext) (*planWithMetaData, error) { doc, report := astparser.ParseGraphqlDocumentString(ctx.content) if report.HasErrors() { return nil, &reportError{report: &report} } planner, err := plan.NewPlanner(p.executor.PlanConfig) if err != nil { return nil, err } var ( preparedPlan plan.Plan ) // create and postprocess the plan // planning uses the router schema if ctx.executionOptions.IncludeQueryPlanInResponse { preparedPlan = planner.Plan(&doc, p.executor.RouterSchema, ctx.name, &report, plan.IncludeQueryPlanInResponse()) } else { preparedPlan = planner.Plan(&doc, p.executor.RouterSchema, ctx.name, &report) } if report.HasErrors() { return nil, &reportError{report: &report} } post := postprocess.NewProcessor(postprocess.CollectDataSourceInfo()) post.Process(preparedPlan) out := &planWithMetaData{ preparedPlan: preparedPlan, operationDocument: &doc, schemaDocument: p.executor.RouterSchema, } if p.trackUsageInfo { out.typeFieldUsageInfo = graphqlschemausage.GetTypeFieldUsageInfo(preparedPlan) out.argumentUsageInfo, err = graphqlschemausage.GetArgumentUsageInfo(&doc, p.executor.RouterSchema) if err != nil { return nil, err } } return out, nil } type PlanOptions struct { ClientInfo *ClientInfo TraceOptions resolve.TraceOptions ExecutionOptions resolve.ExecutionOptions TrackSchemaUsageInfo bool } func (p *OperationPlanner) plan(opContext *operationContext, options PlanOptions) (err error) { // if we have tracing enabled or want to include a query plan in the response we always prepare a new plan // this is because in case of tracing, we're writing trace data to the plan // in case of including the query plan, we don't want to cache this additional overhead skipCache := options.TraceOptions.Enable || options.ExecutionOptions.IncludeQueryPlanInResponse if skipCache { prepared, err := p.preparePlan(opContext) if err != nil { return err } opContext.preparedPlan = prepared if options.TrackSchemaUsageInfo { opContext.typeFieldUsageInfo = prepared.typeFieldUsageInfo opContext.argumentUsageInfo = prepared.argumentUsageInfo opContext.inputUsageInfo, err = graphqlschemausage.GetInputUsageInfo(prepared.operationDocument, p.executor.RouterSchema, opContext.variables) if err != nil { return err } } return nil } operationID := opContext.internalHash // try to get a prepared plan for this operation ID from the cache cachedPlan, ok := p.planCache.Get(operationID) if ok && cachedPlan != nil { // re-use a prepared plan opContext.preparedPlan = cachedPlan opContext.planCacheHit = true } else { // prepare a new plan using single flight // this ensures that we only prepare the plan once for this operation ID operationIDStr := strconv.FormatUint(operationID, 10) sharedPreparedPlan, err, _ := p.sf.Do(operationIDStr, func() (interface{}, error) { prepared, err := p.preparePlan(opContext) if err != nil { return nil, err } p.planCache.Set(operationID, prepared, 1) return prepared, nil }) if err != nil { return err } opContext.preparedPlan, ok = sharedPreparedPlan.(*planWithMetaData) if !ok { return errors.New("unexpected prepared plan type") } } if options.TrackSchemaUsageInfo { opContext.typeFieldUsageInfo = opContext.preparedPlan.typeFieldUsageInfo opContext.argumentUsageInfo = opContext.preparedPlan.argumentUsageInfo opContext.inputUsageInfo, err = graphqlschemausage.GetInputUsageInfo(opContext.preparedPlan.operationDocument, p.executor.RouterSchema, opContext.variables) if err != nil { return err } } return nil }