collector/logs/engine/worker.go (75 lines of code) (raw):
package engine
import (
"context"
"sync"
"time"
"github.com/Azure/adx-mon/collector/logs/types"
"github.com/Azure/adx-mon/metrics"
"github.com/Azure/adx-mon/pkg/logger"
)
type worker struct {
SourceName string
Input <-chan *types.LogBatch
Transforms []types.Transformer
Sinks []types.Sink
BatchTimeout time.Duration
}
type WorkerCreatorFunc func(string, <-chan *types.LogBatch) *worker
func WorkerCreator(transforms []types.Transformer, sinks []types.Sink) func(string, <-chan *types.LogBatch) *worker {
return func(sourceName string, input <-chan *types.LogBatch) *worker {
return &worker{
SourceName: sourceName,
Input: input,
Transforms: transforms,
Sinks: sinks,
BatchTimeout: 10 * time.Second,
}
}
}
// Run starts the worker and processes incoming log batches.
// It will block until the input channel is closed.
func (w *worker) Run() {
for msg := range w.Input {
ctx, cancel := context.WithTimeout(context.Background(), w.BatchTimeout)
w.processBatch(ctx, msg)
cancel()
}
}
func (w *worker) processBatch(ctx context.Context, batch *types.LogBatch) {
var err error
for _, transform := range w.Transforms {
batch, err = transform.Transform(ctx, batch)
if err != nil {
logger.Warnf("Failed to transform logs from source %s -> %s: %v", w.SourceName, transform.Name(), err)
metrics.LogsCollectorLogsDropped.WithLabelValues(w.SourceName, transform.Name()).Add(float64(len(batch.Logs)))
disposeBatch(batch)
return
}
}
// Freeze the logs in the batch to prevent further modifications
for _, log := range batch.Logs {
log.Freeze()
}
var wg sync.WaitGroup
wg.Add(len(w.Sinks))
for _, sink := range w.Sinks {
go func(sink types.Sink) {
defer wg.Done()
sendToSink(ctx, sink, batch, w.SourceName)
}(sink)
}
wg.Wait()
disposeBatch(batch)
}
func sendToSink(ctx context.Context, sink types.Sink, batch *types.LogBatch, sourceName string) {
err := sink.Send(ctx, batch)
if err != nil {
logger.Warnf("Failed to send logs to sink %s -> %s: %v", sourceName, sink.Name(), err)
metrics.LogsCollectorLogsDropped.WithLabelValues(sourceName, sink.Name()).Add(float64(len(batch.Logs)))
return
}
metrics.LogsCollectorLogsSent.WithLabelValues(sourceName, sink.Name()).Add(float64(len(batch.Logs)))
}
func disposeBatch(batch *types.LogBatch) {
for _, log := range batch.Logs {
types.LogPool.Put(log)
}
types.LogBatchPool.Put(batch)
}