router/pkg/pubsub/datasource/planner.go (169 lines of code) (raw):
package datasource
import (
"fmt"
"strings"
"github.com/wundergraph/cosmo/router/pkg/pubsub/eventdata"
"github.com/wundergraph/graphql-go-tools/v2/pkg/ast"
"github.com/wundergraph/graphql-go-tools/v2/pkg/engine/argument_templates"
"github.com/wundergraph/graphql-go-tools/v2/pkg/engine/plan"
"github.com/wundergraph/graphql-go-tools/v2/pkg/engine/resolve"
)
type Planner[PB ProviderBuilder[P, E], P any, E any] struct {
id int
config *PlannerConfig[PB, P, E]
rootFieldRef int
variables resolve.Variables
visitor *plan.Visitor
extractFn func(tpl string) (string, error)
}
func (p *Planner[PB, P, E]) SetID(id int) {
p.id = id
}
func (p *Planner[PB, P, E]) ID() (id int) {
return p.id
}
func (p *Planner[PB, P, E]) DownstreamResponseFieldAlias(downstreamFieldRef int) (alias string, exists bool) {
// skip, not required
return
}
func (p *Planner[PB, P, E]) DataSourcePlanningBehavior() plan.DataSourcePlanningBehavior {
return plan.DataSourcePlanningBehavior{
MergeAliasedRootNodes: false,
OverrideFieldPathFromAlias: false,
}
}
func (p *Planner[PB, P, E]) Register(visitor *plan.Visitor, configuration plan.DataSourceConfiguration[*PlannerConfig[PB, P, E]], _ plan.DataSourcePlannerConfiguration) error {
p.visitor = visitor
visitor.Walker.RegisterEnterFieldVisitor(p)
visitor.Walker.RegisterEnterDocumentVisitor(p)
p.config = configuration.CustomConfiguration()
return nil
}
func (p *Planner[PB, P, E]) ConfigureFetch() resolve.FetchConfiguration {
if p.config == nil {
p.visitor.Walker.StopWithInternalErr(fmt.Errorf("data source not set"))
return resolve.FetchConfiguration{}
}
pubSubDataSource, err := p.config.ProviderBuilder.BuildEngineDataSourceFactory(p.config.Event)
if err != nil {
p.visitor.Walker.StopWithInternalErr(fmt.Errorf("failed to build data source: %w", err))
return resolve.FetchConfiguration{}
}
err = pubSubDataSource.TransformEventData(p.extractFn)
if err != nil {
p.visitor.Walker.StopWithInternalErr(err)
}
dataSource, err := pubSubDataSource.ResolveDataSource()
if err != nil {
p.visitor.Walker.StopWithInternalErr(fmt.Errorf("failed to get data source: %w", err))
return resolve.FetchConfiguration{}
}
event, err := eventdata.BuildEventDataBytes(p.rootFieldRef, p.visitor.Operation, &p.variables)
if err != nil {
p.visitor.Walker.StopWithInternalErr(fmt.Errorf("failed to get resolve data source input: %w", err))
return resolve.FetchConfiguration{}
}
input, err := pubSubDataSource.ResolveDataSourceInput(event)
if err != nil {
p.visitor.Walker.StopWithInternalErr(fmt.Errorf("failed to get resolve data source input: %w", err))
return resolve.FetchConfiguration{}
}
return resolve.FetchConfiguration{
Input: input,
Variables: p.variables,
DataSource: dataSource,
PostProcessing: resolve.PostProcessingConfiguration{
MergePath: []string{pubSubDataSource.GetFieldName()},
},
}
}
func (p *Planner[PB, P, E]) ConfigureSubscription() plan.SubscriptionConfiguration {
if p.config == nil {
p.visitor.Walker.StopWithInternalErr(fmt.Errorf("data source not set"))
return plan.SubscriptionConfiguration{}
}
pubSubDataSource, err := p.config.ProviderBuilder.BuildEngineDataSourceFactory(p.config.Event)
if err != nil {
p.visitor.Walker.StopWithInternalErr(fmt.Errorf("failed to get resolve data source subscription: %w", err))
return plan.SubscriptionConfiguration{}
}
err = pubSubDataSource.TransformEventData(p.extractFn)
if err != nil {
p.visitor.Walker.StopWithInternalErr(err)
}
dataSource, err := pubSubDataSource.ResolveDataSourceSubscription()
if err != nil {
p.visitor.Walker.StopWithInternalErr(fmt.Errorf("failed to get resolve data source subscription: %w", err))
return plan.SubscriptionConfiguration{}
}
input, err := pubSubDataSource.ResolveDataSourceSubscriptionInput()
if err != nil {
p.visitor.Walker.StopWithInternalErr(fmt.Errorf("failed to get resolve data source subscription input: %w", err))
return plan.SubscriptionConfiguration{}
}
return plan.SubscriptionConfiguration{
Input: input,
Variables: p.variables,
DataSource: dataSource,
PostProcessing: resolve.PostProcessingConfiguration{
MergePath: []string{pubSubDataSource.GetFieldName()},
},
}
}
func (p *Planner[PB, P, E]) addContextVariableByArgumentRef(argumentRef int, operationTypeRef int, argumentPath []string) (string, error) {
variablePath, err := p.visitor.Operation.VariablePathByArgumentRefAndArgumentPath(argumentRef, argumentPath, operationTypeRef)
if err != nil {
return "", err
}
/* The definition is passed as both definition and operation below because getJSONRootType resolves the type
* from the first argument, but finalInputValueTypeRef comes from the definition
*/
contextVariable := &resolve.ContextVariable{
Path: variablePath,
Renderer: resolve.NewPlainVariableRenderer(),
}
variablePlaceHolder, _ := p.variables.AddVariable(contextVariable)
return variablePlaceHolder, nil
}
func (p *Planner[PB, P, E]) extractArgumentTemplate(fieldRef int, operationDefinitionRef int, typeDefinitionRef int, template string) (string, error) {
matches := argument_templates.ArgumentTemplateRegex.FindAllStringSubmatch(template, -1)
// If no argument templates are defined, there are only static values
if len(matches) < 1 {
return template, nil
}
fieldNameBytes := p.visitor.Operation.FieldNameBytes(fieldRef)
// TODO: handling for interfaces and unions
fieldDefinitionRef, ok := p.visitor.Definition.ObjectTypeDefinitionFieldWithName(typeDefinitionRef, fieldNameBytes)
if !ok {
return "", fmt.Errorf(`expected field definition to exist for field "%s"`, fieldNameBytes)
}
templateWithVariableTemplateReplacements := template
for templateNumber, groups := range matches {
// The first group is the whole template; the second is the period-delimited argument path
if len(groups) != 2 {
return "", fmt.Errorf(`argument template #%d defined on field "%s" is invalid: expected 2 matching groups but received %d`, templateNumber+1, fieldNameBytes, len(groups)-1)
}
validationResult, err := argument_templates.ValidateArgumentPath(p.visitor.Definition, groups[1], fieldDefinitionRef)
if err != nil {
return "", fmt.Errorf(`argument template #%d defined on field "%s" is invalid: %w`, templateNumber+1, fieldNameBytes, err)
}
argumentNameBytes := []byte(validationResult.ArgumentPath[0])
argumentRef, ok := p.visitor.Operation.FieldArgument(fieldRef, argumentNameBytes)
if !ok {
return "", fmt.Errorf(`operation field "%s" does not define argument "%s"`, fieldNameBytes, argumentNameBytes)
}
// variablePlaceholder has the form $$0$$, $$1$$, etc.
variablePlaceholder, err := p.addContextVariableByArgumentRef(argumentRef, operationDefinitionRef, validationResult.ArgumentPath)
if err != nil {
return "", fmt.Errorf(`failed to retrieve variable placeholder for argument ""%s" defined on operation field "%s": %w`, argumentNameBytes, fieldNameBytes, err)
}
// Replace the template literal with the variable placeholder (and reuse the variable if it already exists)
templateWithVariableTemplateReplacements = strings.ReplaceAll(templateWithVariableTemplateReplacements, groups[0], variablePlaceholder)
}
return templateWithVariableTemplateReplacements, nil
}
func (p *Planner[PB, P, E]) EnterDocument(_, _ *ast.Document) {
p.rootFieldRef = -1
}
func (p *Planner[PB, P, E]) EnterField(ref int) {
if p.rootFieldRef != -1 {
// This is a nested field; nothing needs to be done
return
}
p.rootFieldRef = ref
operationDefinitionRef := p.visitor.Walker.Ancestors[0].Ref
typeDefinitionRef := p.visitor.Walker.EnclosingTypeDefinition.Ref
p.extractFn = func(tpl string) (string, error) {
return p.extractArgumentTemplate(ref, operationDefinitionRef, typeDefinitionRef, tpl)
}
}