in batching/batching.go [231:259]
func (b *Batcher) handleInput(ctx context.Context, tick <-chan time.Time) (exit bool, err error) {
select {
case data, ok := <-b.in:
if !ok {
return true, nil
}
if err := b.handleData(data); err != nil {
metrics.Error(ctx)
return false, err
}
if b.batchSize > 0 {
if b.current.Len() == b.batchSize {
b.emitter(ctx)
} else if b.current.Len() > b.batchSize {
b.log.Error("Bug: batch size exceeded in Batcher")
b.emitter(ctx)
}
}
case <-tick:
if b.current.Len() == 0 {
metrics.Success(ctx)
return false, nil
}
b.emitter(ctx)
}
metrics.Success(ctx)
return false, nil
}