router/pkg/pubsub/redis/engine_datasource_factory.go (88 lines of code) (raw):
package redis
import (
"encoding/json"
"fmt"
"slices"
"github.com/wundergraph/cosmo/router/pkg/pubsub/datasource"
"github.com/wundergraph/graphql-go-tools/v2/pkg/engine/resolve"
)
type EventType int
const (
EventTypePublish EventType = iota
EventTypeSubscribe
)
// EngineDataSourceFactory implements the datasource.EngineDataSourceFactory interface for Redis
type EngineDataSourceFactory struct {
RedisAdapter Adapter
fieldName string
eventType EventType
channels []string
providerId string
}
func (c *EngineDataSourceFactory) GetFieldName() string {
return c.fieldName
}
// ResolveDataSource returns the appropriate data source based on the event type
func (c *EngineDataSourceFactory) ResolveDataSource() (resolve.DataSource, error) {
var dataSource resolve.DataSource
eventType := c.eventType
switch eventType {
case EventTypePublish:
dataSource = &PublishDataSource{
pubSub: c.RedisAdapter,
}
default:
return nil, fmt.Errorf("failed to configure fetch: invalid event type \"%d\" for Redis", eventType)
}
return dataSource, nil
}
// ResolveDataSourceInput builds the input for the data source
func (c *EngineDataSourceFactory) ResolveDataSourceInput(eventData []byte) (string, error) {
channels := c.channels
if len(channels) != 1 {
return "", fmt.Errorf("publish events should define one channel but received %d", len(channels))
}
channel := channels[0]
providerId := c.providerId
evtCfg := PublishEventConfiguration{
ProviderID: providerId,
Channel: channel,
Data: eventData,
}
return evtCfg.MarshalJSONTemplate()
}
// ResolveDataSourceSubscription returns the subscription data source
func (c *EngineDataSourceFactory) ResolveDataSourceSubscription() (resolve.SubscriptionDataSource, error) {
return &SubscriptionDataSource{
pubSub: c.RedisAdapter,
}, nil
}
// ResolveDataSourceSubscriptionInput builds the input for the subscription data source
func (c *EngineDataSourceFactory) ResolveDataSourceSubscriptionInput() (string, error) {
evtCfg := SubscriptionEventConfiguration{
ProviderID: c.providerId,
Channels: c.channels,
}
object, err := json.Marshal(evtCfg)
if err != nil {
return "", fmt.Errorf("failed to marshal event subscription configuration")
}
return string(object), nil
}
// TransformEventData transforms the event data using the extract function
func (c *EngineDataSourceFactory) TransformEventData(extractFn datasource.ArgumentTemplateCallback) error {
switch c.eventType {
case EventTypePublish:
extractedChannel, err := extractFn(c.channels[0])
if err != nil {
return fmt.Errorf("unable to parse channel with id %s", c.channels[0])
}
c.channels = []string{extractedChannel}
case EventTypeSubscribe:
extractedChannels := make([]string, 0, len(c.channels))
for _, rawChannel := range c.channels {
extractedChannel, err := extractFn(rawChannel)
if err != nil {
return nil
}
extractedChannels = append(extractedChannels, extractedChannel)
}
slices.Sort(extractedChannels)
c.channels = extractedChannels
}
return nil
}