router/pkg/pubsub/pubsub.go (169 lines of code) (raw):

package pubsub import ( "context" "fmt" "slices" "strconv" nodev1 "github.com/wundergraph/cosmo/router/gen/proto/wg/cosmo/node/v1" "github.com/wundergraph/cosmo/router/pkg/config" pubsub_datasource "github.com/wundergraph/cosmo/router/pkg/pubsub/datasource" "github.com/wundergraph/cosmo/router/pkg/pubsub/kafka" "github.com/wundergraph/cosmo/router/pkg/pubsub/nats" "github.com/wundergraph/cosmo/router/pkg/pubsub/redis" "github.com/wundergraph/graphql-go-tools/v2/pkg/engine/plan" "go.uber.org/zap" ) type DataSourceConfigurationWithMetadata struct { Configuration *nodev1.DataSourceConfiguration Metadata *plan.DataSourceMetadata } type GetID interface { GetID() string } type GetEngineEventConfiguration interface { GetEngineEventConfiguration() *nodev1.EngineEventConfiguration } type EngineEventConfiguration interface { GetTypeName() string GetFieldName() string GetProviderId() string } type ProviderNotDefinedError struct { ProviderID string ProviderTypeID string } type dsConfAndEvents[E GetEngineEventConfiguration] struct { dsConf *DataSourceConfigurationWithMetadata events []E } func (e *ProviderNotDefinedError) Error() string { return fmt.Sprintf("%s provider with ID %s is not defined", e.ProviderTypeID, e.ProviderID) } // BuildProvidersAndDataSources is a generic function that builds providers and data sources for the given // EventsConfiguration and DataSourceConfigurationWithMetadata func BuildProvidersAndDataSources( ctx context.Context, config config.EventsConfiguration, logger *zap.Logger, dsConfs []DataSourceConfigurationWithMetadata, hostName string, routerListenAddr string, ) ([]pubsub_datasource.Provider, []plan.DataSource, error) { var pubSubProviders []pubsub_datasource.Provider var outs []plan.DataSource // initialize Kafka providers and data sources kafkaBuilder := kafka.NewProviderBuilder(ctx, logger, hostName, routerListenAddr) kafkaDsConfsWithEvents := []dsConfAndEvents[*nodev1.KafkaEventConfiguration]{} for _, dsConf := range dsConfs { kafkaDsConfsWithEvents = append(kafkaDsConfsWithEvents, dsConfAndEvents[*nodev1.KafkaEventConfiguration]{ dsConf: &dsConf, events: dsConf.Configuration.GetCustomEvents().GetKafka(), }) } kafkaPubSubProviders, kafkaOuts, err := build(ctx, kafkaBuilder, config.Providers.Kafka, kafkaDsConfsWithEvents) if err != nil { return nil, nil, err } pubSubProviders = append(pubSubProviders, kafkaPubSubProviders...) outs = append(outs, kafkaOuts...) // initialize NATS providers and data sources natsBuilder := nats.NewProviderBuilder(ctx, logger, hostName, routerListenAddr) natsDsConfsWithEvents := []dsConfAndEvents[*nodev1.NatsEventConfiguration]{} for _, dsConf := range dsConfs { natsDsConfsWithEvents = append(natsDsConfsWithEvents, dsConfAndEvents[*nodev1.NatsEventConfiguration]{ dsConf: &dsConf, events: dsConf.Configuration.GetCustomEvents().GetNats(), }) } natsPubSubProviders, natsOuts, err := build(ctx, natsBuilder, config.Providers.Nats, natsDsConfsWithEvents) if err != nil { return nil, nil, err } pubSubProviders = append(pubSubProviders, natsPubSubProviders...) outs = append(outs, natsOuts...) // initialize Redis providers and data sources redisBuilder := redis.NewProviderBuilder(ctx, logger, hostName, routerListenAddr) redisDsConfsWithEvents := []dsConfAndEvents[*nodev1.RedisEventConfiguration]{} for _, dsConf := range dsConfs { redisDsConfsWithEvents = append(redisDsConfsWithEvents, dsConfAndEvents[*nodev1.RedisEventConfiguration]{ dsConf: &dsConf, events: dsConf.Configuration.GetCustomEvents().GetRedis(), }) } redisPubSubProviders, redisOuts, err := build(ctx, redisBuilder, config.Providers.Redis, redisDsConfsWithEvents) if err != nil { return nil, nil, err } pubSubProviders = append(pubSubProviders, redisPubSubProviders...) outs = append(outs, redisOuts...) return pubSubProviders, outs, nil } func build[P GetID, E GetEngineEventConfiguration](ctx context.Context, builder pubsub_datasource.ProviderBuilder[P, E], providersData []P, dsConfs []dsConfAndEvents[E]) ([]pubsub_datasource.Provider, []plan.DataSource, error) { var pubSubProviders []pubsub_datasource.Provider var outs []plan.DataSource // check used providers usedProviderIds := []string{} for _, dsConf := range dsConfs { for _, event := range dsConf.events { if !slices.Contains(usedProviderIds, event.GetEngineEventConfiguration().GetProviderId()) { usedProviderIds = append(usedProviderIds, event.GetEngineEventConfiguration().GetProviderId()) } } } // initialize providers if used providerIds := []string{} for _, providerData := range providersData { if !slices.Contains(usedProviderIds, providerData.GetID()) { continue } provider, err := builder.BuildProvider(providerData) if err != nil { return nil, nil, err } pubSubProviders = append(pubSubProviders, provider) providerIds = append(providerIds, provider.ID()) } // check if all used providers are initialized for _, providerId := range usedProviderIds { if !slices.Contains(providerIds, providerId) { return pubSubProviders, nil, &ProviderNotDefinedError{ ProviderID: providerId, ProviderTypeID: builder.TypeID(), } } } // build data sources for each event for _, dsConf := range dsConfs { for i, event := range dsConf.events { plannerConfig := pubsub_datasource.NewPlannerConfig(builder, event) out, err := plan.NewDataSourceConfiguration( dsConf.dsConf.Configuration.Id+"-"+builder.TypeID()+"-"+strconv.Itoa(i), pubsub_datasource.NewPlannerFactory(ctx, plannerConfig), getFilteredDataSourceMetadata(event.GetEngineEventConfiguration(), dsConf.dsConf.Metadata), plannerConfig, ) if err != nil { return nil, nil, err } outs = append(outs, out) } } return pubSubProviders, outs, nil } func getFilteredDataSourceMetadata[E EngineEventConfiguration](event E, dsMeta *plan.DataSourceMetadata) *plan.DataSourceMetadata { // find used root types and fields rootFields := make(map[string][]string) typeName := event.GetTypeName() fieldName := event.GetFieldName() if _, ok := rootFields[typeName]; !ok { rootFields[typeName] = []string{} } rootFields[typeName] = append(rootFields[typeName], fieldName) // filter dsMeta.RootNodes newRootNodes := []plan.TypeField{} for _, node := range dsMeta.RootNodes { newRootNode := plan.TypeField{ TypeName: node.TypeName, FieldNames: []string{}, ExternalFieldNames: node.ExternalFieldNames, } for _, fieldName := range node.FieldNames { if slices.Contains(rootFields[node.TypeName], fieldName) { newRootNode.FieldNames = append(newRootNode.FieldNames, fieldName) } } newRootNodes = append(newRootNodes, newRootNode) } newDsMeta := *dsMeta newDsMeta.RootNodes = newRootNodes return &newDsMeta }