internal/routing/routing.go (103 lines of code) (raw):

/* Package routing routes batching.Batches received on an input channel to output receivers that wish to process the information. Usage: router, err := router.New(ctx, in) if err != nil { // Do something } if err := router.Register(ctx, "data handler name", outToCh); err != nil { // Do something } if err := router.Start(ctx); err != nil { // Do something } // Note: closing "in" will stop the router. */ package routing import ( "context" "errors" "fmt" "log/slog" "github.com/Azure/tattler/batching" "github.com/gostdlib/concurrency/prim/wait" ) type route struct { out chan batching.Batches name string } type routes []route // Batches routes batches to registered destinations. type Batches struct { input chan batching.Batches routes routes started bool log *slog.Logger } // Option is an optional argument to New(). type Option func(b *Batches) error // WithLogger sets Batches to use a custom Logger. func WithLogger(l *slog.Logger) Option { return func(b *Batches) error { if l == nil { return fmt.Errorf("WithLogger does not accept a nil *slog.Logger") } b.log = l return nil } } // New is the constructor for Batches. func New(ctx context.Context, input chan batching.Batches, options ...Option) (*Batches, error) { if input == nil { return nil, errors.New("routing.New: input channel cannot be nil") } b := &Batches{ input: input, routes: routes{}, log: slog.Default(), } for _, o := range options { if err := o(b); err != nil { return nil, err } } return b, nil } // Register registers a routeCh for data with a specific date.EntryType and ObjectType. // You may register the same combination for different routeCh. func (b *Batches) Register(ctx context.Context, name string, ch chan batching.Batches) error { if b.started { return fmt.Errorf("routing.Batches.Register: cannot Register a route after Start() is called") } if name == "" { return fmt.Errorf("routing.Batches.Register; cannot Register a route with an empty name") } if ch == nil { return fmt.Errorf("routing.Batches.Register: cannot Register a route with a nil channel") } b.routes = append(b.routes, route{name: name, out: ch}) return nil } // Exists returns true if a route with the given name exists. func (b *Batches) Exists(name string) bool { for _, r := range b.routes { if r.name == name { return true } } return false } // Start starts routing data coming from input. This can be stopped by closing the input channel. func (b *Batches) Start(ctx context.Context) error { if len(b.routes) == 0 { return errors.New("routing.Batches: cannot start without registered routes") } ctx = context.WithoutCancel(ctx) b.started = true g := wait.Group{} g.Go(ctx, func(ctx context.Context) error { b.handleInput(ctx) return nil }) go func() { g.Wait(ctx) for _, r := range b.routes { close(r.out) } }() return nil } // handleInput receives data on the input channel and pushes it to the appropriate receivers. func (b *Batches) handleInput(ctx context.Context) { for batches := range b.input { for _, r := range b.routes { if err := b.push(ctx, r, batches); err != nil { b.log.Error(err.Error()) } } } } // push pushes a batches to a route. func (b *Batches) push(ctx context.Context, r route, batches batching.Batches) error { select { case r.out <- batches: default: return fmt.Errorf("routing.Batches.push: dropping data to slow receiver(%s)", r.name) } return nil }