collector/logs/engine/batch.go (48 lines of code) (raw):

package engine import ( "context" "time" "github.com/Azure/adx-mon/collector/logs/types" ) type BatchConfig struct { MaxBatchSize int MaxBatchWait time.Duration InputQueue <-chan *types.Log OutputQueue chan<- *types.LogBatch AckGenerator func(log *types.Log) func() } func BatchLogs(ctx context.Context, config BatchConfig) { ticker := time.NewTicker(config.MaxBatchWait) defer ticker.Stop() currentBatch := types.LogBatchPool.Get(1024).(*types.LogBatch) currentBatch.Reset() for { select { case <-ctx.Done(): if len(currentBatch.Logs) != 0 { flush(config, currentBatch) } close(config.OutputQueue) return case <-ticker.C: if len(currentBatch.Logs) != 0 { flush(config, currentBatch) currentBatch = types.LogBatchPool.Get(1024).(*types.LogBatch) currentBatch.Reset() } case msg := <-config.InputQueue: currentBatch.Logs = append(currentBatch.Logs, msg) if len(currentBatch.Logs) >= config.MaxBatchSize { flush(config, currentBatch) currentBatch = types.LogBatchPool.Get(1024).(*types.LogBatch) currentBatch.Reset() ticker.Reset(config.MaxBatchWait) } } } } func flush(config BatchConfig, currentBatch *types.LogBatch) { lastMsg := currentBatch.Logs[len(currentBatch.Logs)-1] currentBatch.Ack = config.AckGenerator(lastMsg) config.OutputQueue <- currentBatch }