in collector/logs/sources/tail/tailer.go [39:110]
func StartTailing(config TailerConfig) (*Tailer, error) {
ctx, shutdown := context.WithCancel(context.Background())
batchQueue := make(chan *types.Log, 512)
outputQueue := make(chan *types.LogBatch, 1)
tailConfig := tail.Config{Follow: true, ReOpen: true, Poll: true}
existingCursorPath := cursorPath(config.CursorDirectory, config.Target.FilePath)
fileId, position, err := readCursor(existingCursorPath)
if err == nil {
if logger.IsDebug() {
logger.Debugf("TailSource: found existing cursor for file %q: %s %d", config.Target.FilePath, fileId, position)
}
tailConfig.Location = &tail.SeekInfo{
Offset: position,
Whence: io.SeekStart,
FileIdentifier: fileId,
}
}
tailFile, err := tail.TailFile(config.Target.FilePath, tailConfig)
// This error is fatal. They are only returned in cases of misconfiguration in this path.
if err != nil {
shutdown()
return nil, fmt.Errorf("addTarget create tailfile: %w", err)
}
parsers := parser.NewParsers(config.Target.Parsers, fmt.Sprintf("tailfile %q", config.Target.FilePath))
attributes := make(map[string]interface{})
for k, v := range config.Target.Resources {
attributes[k] = v
}
tailer := &Tailer{
tail: tailFile,
shutdown: shutdown,
database: config.Target.Database,
table: config.Target.Table,
logTypeParser: sourceparse.GetLogTypeParser(config.Target.LogType),
logLineParsers: parsers,
resources: attributes,
}
tailer.wg.Add(1)
go func() {
defer tailer.wg.Done()
readLines(tailer, config.UpdateChan, batchQueue)
}()
batchConfig := engine.BatchConfig{
MaxBatchSize: 1000,
MaxBatchWait: 1 * time.Second,
InputQueue: batchQueue,
OutputQueue: outputQueue,
AckGenerator: config.AckGenerator,
}
tailer.wg.Add(1)
go func() {
defer tailer.wg.Done()
engine.BatchLogs(ctx, batchConfig)
}()
worker := config.WorkerCreator(config.WorkerName, outputQueue)
tailer.wg.Add(1)
go func() {
defer tailer.wg.Done()
worker.Run()
}()
return tailer, nil
}