collector/logs/sources/tail/tailer.go (151 lines of code) (raw):

package tail import ( "context" "fmt" "io" "sync" "time" "github.com/Azure/adx-mon/collector/logs/engine" "github.com/Azure/adx-mon/collector/logs/sources/tail/sourceparse" "github.com/Azure/adx-mon/collector/logs/transforms/parser" "github.com/Azure/adx-mon/collector/logs/types" "github.com/Azure/adx-mon/pkg/logger" "github.com/tenebris-tech/tail" ) type TailerConfig struct { Target FileTailTarget UpdateChan <-chan FileTailTarget AckGenerator func(*types.Log) func() WorkerCreator engine.WorkerCreatorFunc CursorDirectory string WorkerName string } // Tailer is a specific instance of a file being tailed. type Tailer struct { tail *tail.Tail shutdown context.CancelFunc wg sync.WaitGroup database string table string logTypeParser sourceparse.LogTypeParser logLineParsers []parser.Parser resources map[string]interface{} } func StartTailing(config TailerConfig) (*Tailer, error) { ctx, shutdown := context.WithCancel(context.Background()) batchQueue := make(chan *types.Log, 512) outputQueue := make(chan *types.LogBatch, 1) tailConfig := tail.Config{Follow: true, ReOpen: true, Poll: true} existingCursorPath := cursorPath(config.CursorDirectory, config.Target.FilePath) fileId, position, err := readCursor(existingCursorPath) if err == nil { if logger.IsDebug() { logger.Debugf("TailSource: found existing cursor for file %q: %s %d", config.Target.FilePath, fileId, position) } tailConfig.Location = &tail.SeekInfo{ Offset: position, Whence: io.SeekStart, FileIdentifier: fileId, } } tailFile, err := tail.TailFile(config.Target.FilePath, tailConfig) // This error is fatal. They are only returned in cases of misconfiguration in this path. if err != nil { shutdown() return nil, fmt.Errorf("addTarget create tailfile: %w", err) } parsers := parser.NewParsers(config.Target.Parsers, fmt.Sprintf("tailfile %q", config.Target.FilePath)) attributes := make(map[string]interface{}) for k, v := range config.Target.Resources { attributes[k] = v } tailer := &Tailer{ tail: tailFile, shutdown: shutdown, database: config.Target.Database, table: config.Target.Table, logTypeParser: sourceparse.GetLogTypeParser(config.Target.LogType), logLineParsers: parsers, resources: attributes, } tailer.wg.Add(1) go func() { defer tailer.wg.Done() readLines(tailer, config.UpdateChan, batchQueue) }() batchConfig := engine.BatchConfig{ MaxBatchSize: 1000, MaxBatchWait: 1 * time.Second, InputQueue: batchQueue, OutputQueue: outputQueue, AckGenerator: config.AckGenerator, } tailer.wg.Add(1) go func() { defer tailer.wg.Done() engine.BatchLogs(ctx, batchConfig) }() worker := config.WorkerCreator(config.WorkerName, outputQueue) tailer.wg.Add(1) go func() { defer tailer.wg.Done() worker.Run() }() return tailer, nil } // Stop stops the tailer and cleans up resources. // Does not wait for the tailer to finish processing, to allow closing many tailers concurrently. // Call Wait() after calling Stop() to wait for the tailer to finish processing. func (t *Tailer) Stop() { t.tail.Cleanup() t.tail.Stop() t.shutdown() } // Wait waits for the tailer to finish processing. func (t *Tailer) Wait() { t.wg.Wait() } // Consumes until tailer.tail is closed func readLines(tailer *Tailer, updateChannel <-chan FileTailTarget, outputQueue chan<- *types.Log) { for { select { // Receive updates from the optional updateChannel. case newTarget, ok := <-updateChannel: if ok { newParsers := parser.NewParsers(newTarget.Parsers, fmt.Sprintf("tailfile %q", newTarget.FilePath)) tailer.logLineParsers = newParsers tailer.database = newTarget.Database tailer.table = newTarget.Table tailer.resources = make(map[string]interface{}) for k, v := range newTarget.Resources { tailer.resources[k] = v } } case line, ok := <-tailer.tail.Lines: if !ok { logger.Infof("readLines: tailer closed the channel for filename %q", tailer.tail.Filename) return // No longer getting lines due to the tailer being closed. Exit. } if line.Err != nil { logger.Errorf("readLines: tailer error for filename %q: %v", tailer.tail.Filename, line.Err) //skip continue } log := types.LogPool.Get(1).(*types.Log) log.Reset() message, isPartial, err := tailer.logTypeParser.Parse(line.Text, log) if err != nil { logger.Errorf("readLines: parselog error for filename %q: %v", tailer.tail.Filename, err) //skip types.LogPool.Put(log) continue } if isPartial { types.LogPool.Put(log) continue } position := line.Offset currentFileId := line.FileIdentifier log.SetAttributeValue(types.AttributeDatabaseName, tailer.database) log.SetAttributeValue(types.AttributeTableName, tailer.table) for k, v := range tailer.resources { log.SetResourceValue(k, v) } parser.ExecuteParsers(tailer.logLineParsers, log, message, tailer.tail.Filename) // Write after parsing to ensure these values are always set to values we need for acking. log.SetAttributeValue(cursor_position, position) log.SetAttributeValue(cursor_file_id, currentFileId) log.SetAttributeValue(cursor_file_name, tailer.tail.Filename) outputQueue <- log } } }