in router/core/factoryresolver.go [222:470]
func (l *Loader) Load(engineConfig *nodev1.EngineConfiguration, subgraphs []*nodev1.Subgraph, routerEngineConfig *RouterEngineConfiguration) (*plan.Configuration, error) {
var outConfig plan.Configuration
// attach field usage information to the plan
outConfig.DefaultFlushIntervalMillis = engineConfig.DefaultFlushInterval
for _, configuration := range engineConfig.FieldConfigurations {
var args []plan.ArgumentConfiguration
for _, argumentConfiguration := range configuration.ArgumentsConfiguration {
arg := plan.ArgumentConfiguration{
Name: argumentConfiguration.Name,
}
switch argumentConfiguration.SourceType {
case nodev1.ArgumentSource_FIELD_ARGUMENT:
arg.SourceType = plan.FieldArgumentSource
case nodev1.ArgumentSource_OBJECT_FIELD:
arg.SourceType = plan.ObjectFieldSource
}
args = append(args, arg)
}
fieldConfig := plan.FieldConfiguration{
TypeName: configuration.TypeName,
FieldName: configuration.FieldName,
Arguments: args,
HasAuthorizationRule: l.fieldHasAuthorizationRule(configuration),
SubscriptionFilterCondition: mapProtoFilterToPlanFilter(configuration.SubscriptionFilterCondition, &plan.SubscriptionFilterCondition{}),
}
outConfig.Fields = append(outConfig.Fields, fieldConfig)
}
for _, configuration := range engineConfig.TypeConfigurations {
outConfig.Types = append(outConfig.Types, plan.TypeConfiguration{
TypeName: configuration.TypeName,
RenameTo: configuration.RenameTo,
})
}
for _, in := range engineConfig.DatasourceConfigurations {
var out plan.DataSource
switch in.Kind {
case nodev1.DataSourceKind_STATIC:
factory, err := l.resolver.ResolveStaticFactory()
if err != nil {
return nil, err
}
out, err = plan.NewDataSourceConfiguration[staticdatasource.Configuration](
in.Id,
factory,
l.dataSourceMetaData(in),
staticdatasource.Configuration{
Data: config.LoadStringVariable(in.CustomStatic.Data),
},
)
if err != nil {
return nil, fmt.Errorf("error creating data source configuration for data source %s: %w", in.Id, err)
}
case nodev1.DataSourceKind_GRAPHQL:
header := http.Header{}
for s, httpHeader := range in.CustomGraphql.Fetch.Header {
for _, value := range httpHeader.Values {
header.Add(s, config.LoadStringVariable(value))
}
}
fetchUrl := config.LoadStringVariable(in.CustomGraphql.Fetch.GetUrl())
subscriptionUrl := config.LoadStringVariable(in.CustomGraphql.Subscription.Url)
if subscriptionUrl == "" {
subscriptionUrl = fetchUrl
}
customScalarTypeFields := make([]graphql_datasource.SingleTypeField, len(in.CustomGraphql.CustomScalarTypeFields))
for i, v := range in.CustomGraphql.CustomScalarTypeFields {
customScalarTypeFields[i] = graphql_datasource.SingleTypeField{
TypeName: v.TypeName,
FieldName: v.FieldName,
}
}
graphqlSchema, err := l.LoadInternedString(engineConfig, in.CustomGraphql.GetUpstreamSchema())
if err != nil {
return nil, fmt.Errorf("could not load GraphQL schema for data source %s: %w", in.Id, err)
}
var subscriptionUseSSE bool
var subscriptionSSEMethodPost bool
if in.CustomGraphql.Subscription.Protocol != nil {
switch *in.CustomGraphql.Subscription.Protocol {
case common.GraphQLSubscriptionProtocol_GRAPHQL_SUBSCRIPTION_PROTOCOL_WS:
subscriptionUseSSE = false
subscriptionSSEMethodPost = false
case common.GraphQLSubscriptionProtocol_GRAPHQL_SUBSCRIPTION_PROTOCOL_SSE:
subscriptionUseSSE = true
subscriptionSSEMethodPost = false
case common.GraphQLSubscriptionProtocol_GRAPHQL_SUBSCRIPTION_PROTOCOL_SSE_POST:
subscriptionUseSSE = true
subscriptionSSEMethodPost = true
}
} else {
// Old style config
if in.CustomGraphql.Subscription.UseSSE != nil {
subscriptionUseSSE = *in.CustomGraphql.Subscription.UseSSE
}
}
wsSubprotocol := "auto"
if in.CustomGraphql.Subscription.WebsocketSubprotocol != nil {
switch *in.CustomGraphql.Subscription.WebsocketSubprotocol {
case common.GraphQLWebsocketSubprotocol_GRAPHQL_WEBSOCKET_SUBPROTOCOL_WS:
wsSubprotocol = "graphql-ws"
case common.GraphQLWebsocketSubprotocol_GRAPHQL_WEBSOCKET_SUBPROTOCOL_TRANSPORT_WS:
wsSubprotocol = "graphql-transport-ws"
case common.GraphQLWebsocketSubprotocol_GRAPHQL_WEBSOCKET_SUBPROTOCOL_AUTO:
wsSubprotocol = "auto"
}
}
dataSourceRules := FetchURLRules(routerEngineConfig.Headers, subgraphs, subscriptionUrl)
forwardedClientHeaders, forwardedClientRegexps, err := PropagatedHeaders(dataSourceRules)
if err != nil {
return nil, fmt.Errorf("error parsing header rules for data source %s: %w", in.Id, err)
}
schemaConfiguration, err := graphql_datasource.NewSchemaConfiguration(
graphqlSchema,
&graphql_datasource.FederationConfiguration{
Enabled: in.CustomGraphql.Federation.Enabled,
ServiceSDL: in.CustomGraphql.Federation.ServiceSdl,
},
)
if err != nil {
return nil, fmt.Errorf("error creating schema configuration for data source %s: %w", in.Id, err)
}
customConfiguration, err := graphql_datasource.NewConfiguration(graphql_datasource.ConfigurationInput{
Fetch: &graphql_datasource.FetchConfiguration{
URL: fetchUrl,
Method: in.CustomGraphql.Fetch.Method.String(),
Header: header,
},
Subscription: &graphql_datasource.SubscriptionConfiguration{
URL: subscriptionUrl,
UseSSE: subscriptionUseSSE,
SSEMethodPost: subscriptionSSEMethodPost,
ForwardedClientHeaderNames: forwardedClientHeaders,
ForwardedClientHeaderRegularExpressions: forwardedClientRegexps,
WsSubProtocol: wsSubprotocol,
},
SchemaConfiguration: schemaConfiguration,
CustomScalarTypeFields: customScalarTypeFields,
})
if err != nil {
return nil, fmt.Errorf("error creating custom configuration for data source %s: %w", in.Id, err)
}
dataSourceName := l.subgraphName(subgraphs, in.Id)
factory, err := l.resolver.ResolveGraphqlFactory(dataSourceName)
if err != nil {
return nil, err
}
out, err = plan.NewDataSourceConfigurationWithName[graphql_datasource.Configuration](
in.Id,
dataSourceName,
factory,
l.dataSourceMetaData(in),
customConfiguration,
)
if err != nil {
return nil, fmt.Errorf("error creating data source configuration for data source %s: %w", in.Id, err)
}
case nodev1.DataSourceKind_PUBSUB:
var eventConfigurations []pubsub_datasource.EventConfiguration
for _, eventConfiguration := range in.GetCustomEvents().GetNats() {
eventType, err := pubsub_datasource.EventTypeFromString(eventConfiguration.EngineEventConfiguration.Type.String())
if err != nil {
return nil, fmt.Errorf("invalid event type %q for data source %q: %w", eventConfiguration.EngineEventConfiguration.Type.String(), in.Id, err)
}
var streamConfiguration *pubsub_datasource.NatsStreamConfiguration
if eventConfiguration.StreamConfiguration != nil {
streamConfiguration = &pubsub_datasource.NatsStreamConfiguration{
Consumer: eventConfiguration.StreamConfiguration.GetConsumerName(),
StreamName: eventConfiguration.StreamConfiguration.GetStreamName(),
ConsumerInactiveThreshold: eventConfiguration.StreamConfiguration.GetConsumerInactiveThreshold(),
}
}
eventConfigurations = append(eventConfigurations, pubsub_datasource.EventConfiguration{
Metadata: &pubsub_datasource.EventMetadata{
ProviderID: eventConfiguration.EngineEventConfiguration.GetProviderId(),
Type: eventType,
TypeName: eventConfiguration.EngineEventConfiguration.GetTypeName(),
FieldName: eventConfiguration.EngineEventConfiguration.GetFieldName(),
},
Configuration: &pubsub_datasource.NatsEventConfiguration{
StreamConfiguration: streamConfiguration,
Subjects: eventConfiguration.GetSubjects(),
},
})
}
for _, eventConfiguration := range in.GetCustomEvents().GetKafka() {
eventType, err := pubsub_datasource.EventTypeFromString(eventConfiguration.EngineEventConfiguration.Type.String())
if err != nil {
return nil, fmt.Errorf("invalid event type %q for data source %q: %w", eventConfiguration.EngineEventConfiguration.Type.String(), in.Id, err)
}
eventConfigurations = append(eventConfigurations, pubsub_datasource.EventConfiguration{
Metadata: &pubsub_datasource.EventMetadata{
ProviderID: eventConfiguration.EngineEventConfiguration.GetProviderId(),
Type: eventType,
TypeName: eventConfiguration.EngineEventConfiguration.GetTypeName(),
FieldName: eventConfiguration.EngineEventConfiguration.GetFieldName(),
},
Configuration: &pubsub_datasource.KafkaEventConfiguration{
Topics: eventConfiguration.GetTopics(),
},
})
}
factory, err := l.resolver.ResolvePubsubFactory()
if err != nil {
return nil, err
}
out, err = plan.NewDataSourceConfiguration[pubsub_datasource.Configuration](
in.Id,
factory,
l.dataSourceMetaData(in),
pubsub_datasource.Configuration{
Events: eventConfigurations,
},
)
if err != nil {
return nil, fmt.Errorf("error creating data source configuration for data source %s: %w", in.Id, err)
}
default:
return nil, fmt.Errorf("unknown data source type %q", in.Kind)
}
outConfig.DataSources = append(outConfig.DataSources, out)
}
return &outConfig, nil
}