router/pkg/pubsub/redis/engine_datasource.go (73 lines of code) (raw):

package redis import ( "bytes" "context" "encoding/json" "fmt" "io" "github.com/buger/jsonparser" "github.com/cespare/xxhash/v2" "github.com/wundergraph/graphql-go-tools/v2/pkg/engine/datasource/httpclient" "github.com/wundergraph/graphql-go-tools/v2/pkg/engine/resolve" ) // SubscriptionEventConfiguration contains configuration for subscription events type SubscriptionEventConfiguration struct { ProviderID string `json:"providerId"` Channels []string `json:"channels"` } // PublishEventConfiguration contains configuration for publish events type PublishEventConfiguration struct { ProviderID string `json:"providerId"` Channel string `json:"channel"` Data json.RawMessage `json:"data"` } func (s *PublishEventConfiguration) MarshalJSONTemplate() (string, error) { return fmt.Sprintf(`{"channel":"%s", "data": %s, "providerId":"%s"}`, s.Channel, s.Data, s.ProviderID), nil } // SubscriptionDataSource implements resolve.SubscriptionDataSource for Redis type SubscriptionDataSource struct { pubSub Adapter } // UniqueRequestID computes a unique ID for the subscription request func (s *SubscriptionDataSource) UniqueRequestID(ctx *resolve.Context, input []byte, xxh *xxhash.Digest) error { val, _, _, err := jsonparser.Get(input, "channels") if err != nil { return err } _, err = xxh.Write(val) if err != nil { return err } val, _, _, err = jsonparser.Get(input, "providerId") if err != nil { return err } _, err = xxh.Write(val) return err } // Start starts the subscription func (s *SubscriptionDataSource) Start(ctx *resolve.Context, input []byte, updater resolve.SubscriptionUpdater) error { var subscriptionConfiguration SubscriptionEventConfiguration err := json.Unmarshal(input, &subscriptionConfiguration) if err != nil { return err } return s.pubSub.Subscribe(ctx.Context(), subscriptionConfiguration, updater) } // LoadInitialData implements the interface method (not used for this subscription type) func (s *SubscriptionDataSource) LoadInitialData(ctx context.Context) (initial []byte, err error) { return nil, nil } // PublishDataSource implements resolve.DataSource for Redis publishing type PublishDataSource struct { pubSub Adapter } // Load processes a request to publish to Redis func (s *PublishDataSource) Load(ctx context.Context, input []byte, out *bytes.Buffer) error { var publishConfiguration PublishEventConfiguration err := json.Unmarshal(input, &publishConfiguration) if err != nil { return err } if err := s.pubSub.Publish(ctx, publishConfiguration); err != nil { _, err = io.WriteString(out, `{"success": false}`) return err } _, err = io.WriteString(out, `{"success": true}`) return err } // LoadWithFiles implements resolve.DataSource.LoadWithFiles (not used for this type) func (s *PublishDataSource) LoadWithFiles(ctx context.Context, input []byte, files []*httpclient.FileUpload, out *bytes.Buffer) (err error) { panic("not implemented") }