in tattler.go [111:175]
func New(ctx context.Context, in chan data.Entry, batchTimespan time.Duration, options ...Option) (*Runner, error) {
if in == nil {
return nil, fmt.Errorf("input channel cannot be nil")
}
r := &Runner{
input: in,
logger: slog.Default(),
}
for _, o := range options {
if err := o(r); err != nil {
return nil, err
}
}
r.batchOpts = append(r.batchOpts, batching.WithLogger(r.logger))
if batchTimespan <= 0 {
return nil, fmt.Errorf("batchTimespan must be greater than 0")
}
batchingIn := make(chan data.Entry, 1)
routerIn := make(chan batching.Batches, 1)
var secretsIn = in
if r.preProcessors != nil {
secretsIn = make(chan data.Entry, 1)
_, err := preprocess.New(ctx, in, secretsIn, r.preProcessors, preprocess.WithLogger(r.logger))
if err != nil {
return nil, err
}
}
if r.meterProvider != nil {
meter := r.meterProvider.Meter("tattler")
if err := batchingmetrics.Init(meter); err != nil {
return nil, err
}
if err := readersmetrics.Init(meter); err != nil {
return nil, err
}
}
secrets, err := safety.New(ctx, secretsIn, batchingIn, safety.WithLogger(r.logger))
if err != nil {
return nil, err
}
batcher, err := batching.New(ctx, batchingIn, routerIn, batchTimespan, r.batchOpts...)
if err != nil {
return nil, err
}
router, err := routing.New(ctx, routerIn, routing.WithLogger(r.logger))
if err != nil {
return nil, err
}
r.secrets = secrets
r.batcher = batcher
r.router = router
return r, nil
}