collector/logs/sources/const.go (81 lines of code) (raw):

package sources import ( "context" "sync" "time" "github.com/Azure/adx-mon/collector/logs/engine" "github.com/Azure/adx-mon/collector/logs/types" ) type ConstSource struct { Value string FlushDuration time.Duration MaxBatchSize int outputQueue chan *types.LogBatch internalQueue chan *types.Log workerGenerator engine.WorkerCreatorFunc closeFn context.CancelFunc wg sync.WaitGroup } // TODO more variety of source values func NewConstSource(value string, flushDuration time.Duration, maxBatchSize int, workerGenerator engine.WorkerCreatorFunc) *ConstSource { return &ConstSource{ Value: value, FlushDuration: flushDuration, MaxBatchSize: maxBatchSize, workerGenerator: workerGenerator, outputQueue: make(chan *types.LogBatch, 1), internalQueue: make(chan *types.Log, 1000), } } func (s *ConstSource) Open(ctx context.Context) error { ctx, closeFn := context.WithCancel(ctx) s.closeFn = closeFn s.wg.Add(1) go func() { defer s.wg.Done() s.generate(ctx) }() config := engine.BatchConfig{ MaxBatchSize: s.MaxBatchSize, MaxBatchWait: s.FlushDuration, InputQueue: s.internalQueue, OutputQueue: s.outputQueue, AckGenerator: func(log *types.Log) func() { return func() { } }, } s.wg.Add(1) go func() { defer s.wg.Done() engine.BatchLogs(ctx, config) }() worker := s.workerGenerator("ConstSource", s.outputQueue) s.wg.Add(1) go func() { defer s.wg.Done() worker.Run() }() return nil } func (s *ConstSource) Close() error { s.closeFn() s.wg.Wait() return nil } func (s *ConstSource) Name() string { return "ConstSource" } func (s *ConstSource) generate(ctx context.Context) { for { log := types.LogPool.Get(1).(*types.Log) log.Reset() log.SetTimestamp(uint64(time.Now().UnixNano())) log.SetObservedTimestamp(uint64(time.Now().UnixNano())) log.SetBodyValue("message", s.Value) select { case <-ctx.Done(): return case s.internalQueue <- log: } } }