tattler.go (156 lines of code) (raw):

/* Package tattler provides a way to read date from a source called a Reader that provides K8 objects from a K8 object source (such as etcd or API Server watchlist), preprocess the data to add or remove fields, and then send the data to a Processor that will process the data in some way (such as sending it to a file, database or service). Reader types can be found in the /reader directory. Processor types are left for the user to implement. See the README.md for more information on how to use this package. */ package tattler import ( "context" "fmt" "log/slog" "sync" "time" "github.com/Azure/tattler/batching" "github.com/Azure/tattler/data" preprocess "github.com/Azure/tattler/internal/preprocess" "github.com/Azure/tattler/internal/routing" "github.com/Azure/tattler/internal/safety" "go.opentelemetry.io/otel/metric" batchingmetrics "github.com/Azure/tattler/internal/metrics/batching" readersmetrics "github.com/Azure/tattler/internal/metrics/readers" ) // Reader defines the interface that must be implemented by all readers. // We do not support a Reader that is not within this package. type Reader interface { // SetOut sets the output channel that the reader must output on. Must return an error and be a no-op // if Run() has been called. SetOut(context.Context, chan data.Entry) error // Run starts the Reader processing. You may only call this once if Run() does not return an error. Run(context.Context) error } // PreProcessor is function that processes data before it is sent to a processor. It must be thread-safe. // This is where you would alter data before it is sent for processing. Any change here affects // all processors. type PreProcessor = preprocess.Processor // Runner runs readers and sends the output through a series data modifications and batching until // it is sent to data processors. type Runner struct { input chan data.Entry secrets *safety.Secrets batchOpts []batching.Option batcher *batching.Batcher router *routing.Batches readers []Reader preProcessors []PreProcessor logger *slog.Logger meterProvider metric.MeterProvider mu sync.Mutex started bool } // Option is an option for New(). type Option func(*Runner) error // WithLogger sets the logger. Defaults to slog.Default(). // You will not need to also use WithBatcherOptions(batching.WithLogger()), as this // will automatically set to the same logger. func WithLogger(l *slog.Logger) Option { return func(r *Runner) error { if l == nil { return fmt.Errorf("logger cannot be nil") } r.logger = l return nil } } // WithPreProcessor appends PreProcessors to the Runner. func WithPreProcessor(p ...PreProcessor) Option { return func(r *Runner) error { r.preProcessors = append(r.preProcessors, p...) return nil } } // WithBatcherOptions sets the options for the Batcher. func WithBatcherOptions(o ...batching.Option) Option { return func(r *Runner) error { r.batchOpts = append(r.batchOpts, o...) return nil } } // WithMeterProvider sets the meter provider with which to register metrics. // Defaults to nil, in which case metrics won't be registered. func WithMeterProvider(m metric.MeterProvider) Option { return func(r *Runner) error { if m == nil { return fmt.Errorf("meter cannot be nil") } r.meterProvider = m return nil } } // New constructs a new Runner. The input channel is the ouput of a Reader object. The batchTimespan // is the duration to wait before sending a batch of data to the processor. There is also a maximum of // 1000 entries that can be sent in a batch. This can be adjust by using WithBatcherOptions(batchingWithBatchSize()). func New(ctx context.Context, in chan data.Entry, batchTimespan time.Duration, options ...Option) (*Runner, error) { if in == nil { return nil, fmt.Errorf("input channel cannot be nil") } r := &Runner{ input: in, logger: slog.Default(), } for _, o := range options { if err := o(r); err != nil { return nil, err } } r.batchOpts = append(r.batchOpts, batching.WithLogger(r.logger)) if batchTimespan <= 0 { return nil, fmt.Errorf("batchTimespan must be greater than 0") } batchingIn := make(chan data.Entry, 1) routerIn := make(chan batching.Batches, 1) var secretsIn = in if r.preProcessors != nil { secretsIn = make(chan data.Entry, 1) _, err := preprocess.New(ctx, in, secretsIn, r.preProcessors, preprocess.WithLogger(r.logger)) if err != nil { return nil, err } } if r.meterProvider != nil { meter := r.meterProvider.Meter("tattler") if err := batchingmetrics.Init(meter); err != nil { return nil, err } if err := readersmetrics.Init(meter); err != nil { return nil, err } } secrets, err := safety.New(ctx, secretsIn, batchingIn, safety.WithLogger(r.logger)) if err != nil { return nil, err } batcher, err := batching.New(ctx, batchingIn, routerIn, batchTimespan, r.batchOpts...) if err != nil { return nil, err } router, err := routing.New(ctx, routerIn, routing.WithLogger(r.logger)) if err != nil { return nil, err } r.secrets = secrets r.batcher = batcher r.router = router return r, nil } // AddReader adds a reader's output channel as input to be processed. A Reader does not need to have // SetOut() or Run() called, as these are handled by AddReader() and Start(). You can add a reader // after Start() has been called. This allows staggering the start of readers. func (r *Runner) AddReader(ctx context.Context, reader Reader) error { r.mu.Lock() defer r.mu.Unlock() if err := reader.SetOut(ctx, r.input); err != nil { return fmt.Errorf("Reader(%T).SetOut(): %w", r, err) } if r.started { if err := reader.Run(ctx); err != nil { return fmt.Errorf("reader(%T): %w", reader, err) } } r.readers = append(r.readers, reader) return nil } // AddProcessor registers a processors input to receive Batches data. This cannot be called // after Start() has been called. func (r *Runner) AddProcessor(ctx context.Context, name string, in chan batching.Batches) error { r.mu.Lock() defer r.mu.Unlock() if in == nil { return fmt.Errorf("in cannot be nil") } if r.started { return fmt.Errorf("cannot add a processor after Runner has started") } return r.router.Register(ctx, name, in) } // Start starts the Runner. func (r *Runner) Start(ctx context.Context) error { r.mu.Lock() defer r.mu.Unlock() for _, reader := range r.readers { if err := reader.Run(ctx); err != nil { return fmt.Errorf("reader(%T): %w", reader, err) } } if err := r.router.Start(ctx); err != nil { return err } return nil }