router/pkg/pubsub/redis/provider_builder.go (59 lines of code) (raw):

package redis import ( "context" "fmt" nodev1 "github.com/wundergraph/cosmo/router/gen/proto/wg/cosmo/node/v1" "github.com/wundergraph/cosmo/router/pkg/config" "github.com/wundergraph/cosmo/router/pkg/pubsub/datasource" "go.uber.org/zap" ) const providerTypeID = "redis" // ProviderBuilder builds Redis PubSub providers type ProviderBuilder struct { ctx context.Context logger *zap.Logger hostName string routerListenAddr string adapters map[string]Adapter } // NewProviderBuilder creates a new Redis PubSub provider builder func NewProviderBuilder( ctx context.Context, logger *zap.Logger, hostName string, routerListenAddr string, ) *ProviderBuilder { return &ProviderBuilder{ ctx: ctx, logger: logger, hostName: hostName, routerListenAddr: routerListenAddr, adapters: make(map[string]Adapter), } } // TypeID returns the provider type ID func (b *ProviderBuilder) TypeID() string { return providerTypeID } // DataSource creates a Redis PubSub data source for the given event configuration func (b *ProviderBuilder) BuildEngineDataSourceFactory(data *nodev1.RedisEventConfiguration) (datasource.EngineDataSourceFactory, error) { providerId := data.GetEngineEventConfiguration().GetProviderId() var eventType EventType switch data.GetEngineEventConfiguration().GetType() { case nodev1.EventType_PUBLISH: eventType = EventTypePublish case nodev1.EventType_SUBSCRIBE: eventType = EventTypeSubscribe default: return nil, fmt.Errorf("unsupported event type: %s", data.GetEngineEventConfiguration().GetType()) } return &EngineDataSourceFactory{ RedisAdapter: b.adapters[providerId], fieldName: data.GetEngineEventConfiguration().GetFieldName(), eventType: eventType, channels: data.GetChannels(), providerId: providerId, }, nil } // Providers returns the Redis PubSub providers for the given provider IDs func (b *ProviderBuilder) BuildProvider(provider config.RedisEventSource) (datasource.Provider, error) { adapter := NewProviderAdapter(b.ctx, b.logger, provider.URLs, provider.ClusterEnabled) pubSubProvider := datasource.NewPubSubProvider(provider.ID, providerTypeID, adapter, b.logger) b.adapters[provider.ID] = adapter return pubSubProvider, nil }