func()

in batching/batching.go [231:259]


func (b *Batcher) handleInput(ctx context.Context, tick <-chan time.Time) (exit bool, err error) {
	select {
	case data, ok := <-b.in:
		if !ok {
			return true, nil
		}
		if err := b.handleData(data); err != nil {
			metrics.Error(ctx)
			return false, err
		}

		if b.batchSize > 0 {
			if b.current.Len() == b.batchSize {
				b.emitter(ctx)
			} else if b.current.Len() > b.batchSize {
				b.log.Error("Bug: batch size exceeded in Batcher")
				b.emitter(ctx)
			}
		}
	case <-tick:
		if b.current.Len() == 0 {
			metrics.Success(ctx)
			return false, nil
		}
		b.emitter(ctx)
	}
	metrics.Success(ctx)
	return false, nil
}