router/pkg/pubsub/redis/adapter.go (120 lines of code) (raw):
package redis
import (
"context"
"fmt"
"sync"
rd "github.com/wundergraph/cosmo/router/internal/persistedoperation/operationstorage/redis"
"github.com/wundergraph/cosmo/router/pkg/pubsub/datasource"
"github.com/wundergraph/graphql-go-tools/v2/pkg/engine/resolve"
"go.uber.org/zap"
)
// Adapter defines the methods that a Redis adapter should implement
type Adapter interface {
// Subscribe subscribes to the given events and sends updates to the updater
Subscribe(ctx context.Context, event SubscriptionEventConfiguration, updater resolve.SubscriptionUpdater) error
// Publish publishes the given event to the specified channel
Publish(ctx context.Context, event PublishEventConfiguration) error
// Startup initializes the adapter
Startup(ctx context.Context) error
// Shutdown gracefully shuts down the adapter
Shutdown(ctx context.Context) error
}
func NewProviderAdapter(ctx context.Context, logger *zap.Logger, urls []string, clusterEnabled bool) Adapter {
ctx, cancel := context.WithCancel(ctx)
return &ProviderAdapter{
ctx: ctx,
cancel: cancel,
logger: logger,
urls: urls,
clusterEnabled: clusterEnabled,
}
}
type ProviderAdapter struct {
ctx context.Context
cancel context.CancelFunc
conn rd.RDCloser
logger *zap.Logger
closeWg sync.WaitGroup
urls []string
clusterEnabled bool
}
func (p *ProviderAdapter) Startup(ctx context.Context) error {
rdCloser, err := rd.NewRedisCloser(&rd.RedisCloserOptions{
Logger: p.logger,
URLs: p.urls,
ClusterEnabled: p.clusterEnabled,
})
if err != nil {
return err
}
p.conn = rdCloser
return nil
}
func (p *ProviderAdapter) Shutdown(ctx context.Context) error {
if p.conn == nil {
return nil
}
// Cancel the context to stop the subscriptions
p.cancel()
// Wait for the subscriptions to be closed
p.closeWg.Wait()
// Close the connection
return p.conn.Close()
}
func (p *ProviderAdapter) Subscribe(ctx context.Context, event SubscriptionEventConfiguration, updater resolve.SubscriptionUpdater) error {
log := p.logger.With(
zap.String("provider_id", event.ProviderID),
zap.String("method", "subscribe"),
zap.Strings("channels", event.Channels),
)
sub := p.conn.PSubscribe(ctx, event.Channels...)
msgChan := sub.Channel()
cleanup := func() {
err := sub.PUnsubscribe(ctx, event.Channels...)
if err != nil {
log.Error(fmt.Sprintf("error unsubscribing from redis for topics %v", event.Channels), zap.Error(err))
}
}
p.closeWg.Add(1)
go func() {
defer p.closeWg.Done()
for {
select {
case msg, ok := <-msgChan:
if !ok {
log.Debug("subscription closed, stopping")
return
}
if msg == nil {
log.Debug("empty message received on subscription update, skipping")
return
}
log.Debug("subscription update", zap.String("message_channel", msg.Channel), zap.String("data", msg.Payload))
updater.Update([]byte(msg.Payload))
case <-p.ctx.Done():
// When the application context is done, we stop the subscription if it is not already done
log.Debug("application context done, stopping subscription")
cleanup()
return
case <-ctx.Done():
// When the subscription context is done, we stop the subscription if it is not already done
log.Debug("subscription context done, stopping subscription")
cleanup()
return
}
}
}()
return nil
}
func (p *ProviderAdapter) Publish(ctx context.Context, event PublishEventConfiguration) error {
log := p.logger.With(
zap.String("provider_id", event.ProviderID),
zap.String("method", "publish"),
zap.String("channel", event.Channel),
)
log.Debug("publish", zap.ByteString("data", event.Data))
data, dataErr := event.Data.MarshalJSON()
if dataErr != nil {
log.Error("error marshalling data", zap.Error(dataErr))
return datasource.NewError("error marshalling data", dataErr)
}
if p.conn == nil {
return datasource.NewError("redis connection not initialized", nil)
}
intCmd := p.conn.Publish(ctx, event.Channel, data)
if intCmd.Err() != nil {
log.Error("publish error", zap.Error(intCmd.Err()))
return datasource.NewError(fmt.Sprintf("error publishing to Redis PubSub channel %s", event.Channel), intCmd.Err())
}
return nil
}