func StartTailing()

in collector/logs/sources/tail/tailer.go [39:110]


func StartTailing(config TailerConfig) (*Tailer, error) {
	ctx, shutdown := context.WithCancel(context.Background())

	batchQueue := make(chan *types.Log, 512)
	outputQueue := make(chan *types.LogBatch, 1)
	tailConfig := tail.Config{Follow: true, ReOpen: true, Poll: true}
	existingCursorPath := cursorPath(config.CursorDirectory, config.Target.FilePath)
	fileId, position, err := readCursor(existingCursorPath)
	if err == nil {
		if logger.IsDebug() {
			logger.Debugf("TailSource: found existing cursor for file %q: %s %d", config.Target.FilePath, fileId, position)
		}
		tailConfig.Location = &tail.SeekInfo{
			Offset:         position,
			Whence:         io.SeekStart,
			FileIdentifier: fileId,
		}
	}

	tailFile, err := tail.TailFile(config.Target.FilePath, tailConfig)
	// This error is fatal. They are only returned in cases of misconfiguration in this path.
	if err != nil {
		shutdown()
		return nil, fmt.Errorf("addTarget create tailfile: %w", err)
	}

	parsers := parser.NewParsers(config.Target.Parsers, fmt.Sprintf("tailfile %q", config.Target.FilePath))

	attributes := make(map[string]interface{})
	for k, v := range config.Target.Resources {
		attributes[k] = v
	}

	tailer := &Tailer{
		tail:           tailFile,
		shutdown:       shutdown,
		database:       config.Target.Database,
		table:          config.Target.Table,
		logTypeParser:  sourceparse.GetLogTypeParser(config.Target.LogType),
		logLineParsers: parsers,
		resources:      attributes,
	}

	tailer.wg.Add(1)
	go func() {
		defer tailer.wg.Done()
		readLines(tailer, config.UpdateChan, batchQueue)
	}()

	batchConfig := engine.BatchConfig{
		MaxBatchSize: 1000,
		MaxBatchWait: 1 * time.Second,
		InputQueue:   batchQueue,
		OutputQueue:  outputQueue,
		AckGenerator: config.AckGenerator,
	}

	tailer.wg.Add(1)
	go func() {
		defer tailer.wg.Done()
		engine.BatchLogs(ctx, batchConfig)
	}()

	worker := config.WorkerCreator(config.WorkerName, outputQueue)
	tailer.wg.Add(1)
	go func() {
		defer tailer.wg.Done()
		worker.Run()
	}()

	return tailer, nil
}