func()

in router/core/factoryresolver.go [222:470]


func (l *Loader) Load(engineConfig *nodev1.EngineConfiguration, subgraphs []*nodev1.Subgraph, routerEngineConfig *RouterEngineConfiguration) (*plan.Configuration, error) {
	var outConfig plan.Configuration
	// attach field usage information to the plan
	outConfig.DefaultFlushIntervalMillis = engineConfig.DefaultFlushInterval
	for _, configuration := range engineConfig.FieldConfigurations {
		var args []plan.ArgumentConfiguration
		for _, argumentConfiguration := range configuration.ArgumentsConfiguration {
			arg := plan.ArgumentConfiguration{
				Name: argumentConfiguration.Name,
			}
			switch argumentConfiguration.SourceType {
			case nodev1.ArgumentSource_FIELD_ARGUMENT:
				arg.SourceType = plan.FieldArgumentSource
			case nodev1.ArgumentSource_OBJECT_FIELD:
				arg.SourceType = plan.ObjectFieldSource
			}
			args = append(args, arg)
		}
		fieldConfig := plan.FieldConfiguration{
			TypeName:                    configuration.TypeName,
			FieldName:                   configuration.FieldName,
			Arguments:                   args,
			HasAuthorizationRule:        l.fieldHasAuthorizationRule(configuration),
			SubscriptionFilterCondition: mapProtoFilterToPlanFilter(configuration.SubscriptionFilterCondition, &plan.SubscriptionFilterCondition{}),
		}
		outConfig.Fields = append(outConfig.Fields, fieldConfig)
	}

	for _, configuration := range engineConfig.TypeConfigurations {
		outConfig.Types = append(outConfig.Types, plan.TypeConfiguration{
			TypeName: configuration.TypeName,
			RenameTo: configuration.RenameTo,
		})
	}

	for _, in := range engineConfig.DatasourceConfigurations {
		var out plan.DataSource

		switch in.Kind {
		case nodev1.DataSourceKind_STATIC:
			factory, err := l.resolver.ResolveStaticFactory()
			if err != nil {
				return nil, err
			}

			out, err = plan.NewDataSourceConfiguration[staticdatasource.Configuration](
				in.Id,
				factory,
				l.dataSourceMetaData(in),
				staticdatasource.Configuration{
					Data: config.LoadStringVariable(in.CustomStatic.Data),
				},
			)
			if err != nil {
				return nil, fmt.Errorf("error creating data source configuration for data source %s: %w", in.Id, err)
			}

		case nodev1.DataSourceKind_GRAPHQL:
			header := http.Header{}
			for s, httpHeader := range in.CustomGraphql.Fetch.Header {
				for _, value := range httpHeader.Values {
					header.Add(s, config.LoadStringVariable(value))
				}
			}

			fetchUrl := config.LoadStringVariable(in.CustomGraphql.Fetch.GetUrl())

			subscriptionUrl := config.LoadStringVariable(in.CustomGraphql.Subscription.Url)
			if subscriptionUrl == "" {
				subscriptionUrl = fetchUrl
			}

			customScalarTypeFields := make([]graphql_datasource.SingleTypeField, len(in.CustomGraphql.CustomScalarTypeFields))
			for i, v := range in.CustomGraphql.CustomScalarTypeFields {
				customScalarTypeFields[i] = graphql_datasource.SingleTypeField{
					TypeName:  v.TypeName,
					FieldName: v.FieldName,
				}
			}

			graphqlSchema, err := l.LoadInternedString(engineConfig, in.CustomGraphql.GetUpstreamSchema())
			if err != nil {
				return nil, fmt.Errorf("could not load GraphQL schema for data source %s: %w", in.Id, err)
			}

			var subscriptionUseSSE bool
			var subscriptionSSEMethodPost bool
			if in.CustomGraphql.Subscription.Protocol != nil {
				switch *in.CustomGraphql.Subscription.Protocol {
				case common.GraphQLSubscriptionProtocol_GRAPHQL_SUBSCRIPTION_PROTOCOL_WS:
					subscriptionUseSSE = false
					subscriptionSSEMethodPost = false
				case common.GraphQLSubscriptionProtocol_GRAPHQL_SUBSCRIPTION_PROTOCOL_SSE:
					subscriptionUseSSE = true
					subscriptionSSEMethodPost = false
				case common.GraphQLSubscriptionProtocol_GRAPHQL_SUBSCRIPTION_PROTOCOL_SSE_POST:
					subscriptionUseSSE = true
					subscriptionSSEMethodPost = true
				}
			} else {
				// Old style config
				if in.CustomGraphql.Subscription.UseSSE != nil {
					subscriptionUseSSE = *in.CustomGraphql.Subscription.UseSSE
				}
			}

			wsSubprotocol := "auto"
			if in.CustomGraphql.Subscription.WebsocketSubprotocol != nil {
				switch *in.CustomGraphql.Subscription.WebsocketSubprotocol {
				case common.GraphQLWebsocketSubprotocol_GRAPHQL_WEBSOCKET_SUBPROTOCOL_WS:
					wsSubprotocol = "graphql-ws"
				case common.GraphQLWebsocketSubprotocol_GRAPHQL_WEBSOCKET_SUBPROTOCOL_TRANSPORT_WS:
					wsSubprotocol = "graphql-transport-ws"
				case common.GraphQLWebsocketSubprotocol_GRAPHQL_WEBSOCKET_SUBPROTOCOL_AUTO:
					wsSubprotocol = "auto"
				}
			}

			dataSourceRules := FetchURLRules(routerEngineConfig.Headers, subgraphs, subscriptionUrl)
			forwardedClientHeaders, forwardedClientRegexps, err := PropagatedHeaders(dataSourceRules)
			if err != nil {
				return nil, fmt.Errorf("error parsing header rules for data source %s: %w", in.Id, err)
			}

			schemaConfiguration, err := graphql_datasource.NewSchemaConfiguration(
				graphqlSchema,
				&graphql_datasource.FederationConfiguration{
					Enabled:    in.CustomGraphql.Federation.Enabled,
					ServiceSDL: in.CustomGraphql.Federation.ServiceSdl,
				},
			)
			if err != nil {
				return nil, fmt.Errorf("error creating schema configuration for data source %s: %w", in.Id, err)
			}

			customConfiguration, err := graphql_datasource.NewConfiguration(graphql_datasource.ConfigurationInput{
				Fetch: &graphql_datasource.FetchConfiguration{
					URL:    fetchUrl,
					Method: in.CustomGraphql.Fetch.Method.String(),
					Header: header,
				},
				Subscription: &graphql_datasource.SubscriptionConfiguration{
					URL:                                     subscriptionUrl,
					UseSSE:                                  subscriptionUseSSE,
					SSEMethodPost:                           subscriptionSSEMethodPost,
					ForwardedClientHeaderNames:              forwardedClientHeaders,
					ForwardedClientHeaderRegularExpressions: forwardedClientRegexps,
					WsSubProtocol:                           wsSubprotocol,
				},
				SchemaConfiguration:    schemaConfiguration,
				CustomScalarTypeFields: customScalarTypeFields,
			})
			if err != nil {
				return nil, fmt.Errorf("error creating custom configuration for data source %s: %w", in.Id, err)
			}

			dataSourceName := l.subgraphName(subgraphs, in.Id)

			factory, err := l.resolver.ResolveGraphqlFactory(dataSourceName)
			if err != nil {
				return nil, err
			}

			out, err = plan.NewDataSourceConfigurationWithName[graphql_datasource.Configuration](
				in.Id,
				dataSourceName,
				factory,
				l.dataSourceMetaData(in),
				customConfiguration,
			)
			if err != nil {
				return nil, fmt.Errorf("error creating data source configuration for data source %s: %w", in.Id, err)
			}

		case nodev1.DataSourceKind_PUBSUB:
			var eventConfigurations []pubsub_datasource.EventConfiguration

			for _, eventConfiguration := range in.GetCustomEvents().GetNats() {
				eventType, err := pubsub_datasource.EventTypeFromString(eventConfiguration.EngineEventConfiguration.Type.String())
				if err != nil {
					return nil, fmt.Errorf("invalid event type %q for data source %q: %w", eventConfiguration.EngineEventConfiguration.Type.String(), in.Id, err)
				}

				var streamConfiguration *pubsub_datasource.NatsStreamConfiguration
				if eventConfiguration.StreamConfiguration != nil {
					streamConfiguration = &pubsub_datasource.NatsStreamConfiguration{
						Consumer:                  eventConfiguration.StreamConfiguration.GetConsumerName(),
						StreamName:                eventConfiguration.StreamConfiguration.GetStreamName(),
						ConsumerInactiveThreshold: eventConfiguration.StreamConfiguration.GetConsumerInactiveThreshold(),
					}
				}

				eventConfigurations = append(eventConfigurations, pubsub_datasource.EventConfiguration{
					Metadata: &pubsub_datasource.EventMetadata{
						ProviderID: eventConfiguration.EngineEventConfiguration.GetProviderId(),
						Type:       eventType,
						TypeName:   eventConfiguration.EngineEventConfiguration.GetTypeName(),
						FieldName:  eventConfiguration.EngineEventConfiguration.GetFieldName(),
					},
					Configuration: &pubsub_datasource.NatsEventConfiguration{
						StreamConfiguration: streamConfiguration,
						Subjects:            eventConfiguration.GetSubjects(),
					},
				})
			}

			for _, eventConfiguration := range in.GetCustomEvents().GetKafka() {
				eventType, err := pubsub_datasource.EventTypeFromString(eventConfiguration.EngineEventConfiguration.Type.String())
				if err != nil {
					return nil, fmt.Errorf("invalid event type %q for data source %q: %w", eventConfiguration.EngineEventConfiguration.Type.String(), in.Id, err)
				}

				eventConfigurations = append(eventConfigurations, pubsub_datasource.EventConfiguration{
					Metadata: &pubsub_datasource.EventMetadata{
						ProviderID: eventConfiguration.EngineEventConfiguration.GetProviderId(),
						Type:       eventType,
						TypeName:   eventConfiguration.EngineEventConfiguration.GetTypeName(),
						FieldName:  eventConfiguration.EngineEventConfiguration.GetFieldName(),
					},
					Configuration: &pubsub_datasource.KafkaEventConfiguration{
						Topics: eventConfiguration.GetTopics(),
					},
				})
			}

			factory, err := l.resolver.ResolvePubsubFactory()
			if err != nil {
				return nil, err
			}

			out, err = plan.NewDataSourceConfiguration[pubsub_datasource.Configuration](
				in.Id,
				factory,
				l.dataSourceMetaData(in),
				pubsub_datasource.Configuration{
					Events: eventConfigurations,
				},
			)
			if err != nil {
				return nil, fmt.Errorf("error creating data source configuration for data source %s: %w", in.Id, err)
			}
		default:
			return nil, fmt.Errorf("unknown data source type %q", in.Kind)
		}

		outConfig.DataSources = append(outConfig.DataSources, out)
	}
	return &outConfig, nil
}