collector/logs/sinks/counting.go (65 lines of code) (raw):

package sinks import ( "context" "fmt" "sync" "github.com/Azure/adx-mon/collector/logs/types" ) // CountingSink is intended for testing purposes to consume at least n logs, then close the Done channel. type CountingSink struct { expectedCount int64 currentCount int64 latest *types.Log lock sync.Mutex done bool doneChannel chan int64 closed bool } func NewCountingSink(expectedCount int64) *CountingSink { return &CountingSink{ expectedCount: expectedCount, currentCount: 0, latest: nil, done: false, doneChannel: make(chan int64, 1), closed: false, } } func (s *CountingSink) Open(ctx context.Context) error { return nil } func (s *CountingSink) Send(ctx context.Context, batch *types.LogBatch) error { s.lock.Lock() defer s.lock.Unlock() if s.closed { return fmt.Errorf("CountingSink is closed") } s.currentCount += int64(len(batch.Logs)) s.latest = batch.Logs[len(batch.Logs)-1] batch.Ack() if !s.done && s.currentCount >= s.expectedCount { s.done = true s.doneChannel <- s.currentCount close(s.doneChannel) } return nil } func (s *CountingSink) Close() error { s.lock.Lock() defer s.lock.Unlock() if s.closed { return fmt.Errorf("CountingSink is already closed") } s.closed = true return nil } func (s *CountingSink) Name() string { return "CountingSink" } func (s *CountingSink) DoneChan() chan int64 { return s.doneChannel } func (s *CountingSink) Latest() *types.Log { s.lock.Lock() defer s.lock.Unlock() return s.latest }