func()

in collector/logs/sources/journal/journal_linux.go [53:122]


func (s *Source) Open(ctx context.Context) error {
	ctx, closeFn := context.WithCancel(ctx)
	s.closeFn = closeFn

	ackGenerator := func(*types.Log) func() { return func() {} }
	if s.cursorDirectory != "" {
		ackGenerator = func(log *types.Log) func() {
			return func() {
				cursorFilePath := types.StringOrEmpty(log.GetAttributeValue(journald_cursor_filename_attribute))
				cursorValue := types.StringOrEmpty(log.GetAttributeValue(journald_cursor_attribute))
				if cursorFilePath == "" || cursorValue == "" {
					return
				}
				writeCursor(cursorFilePath, cursorValue)
			}
		}
	}

	tailers := make([]*tailer, 0, len(s.targets))
	for _, target := range s.targets {
		logger.Info("Opening journal source", "filters", target.Matches, "database", target.Database, "table", target.Table)
		batchQueue := make(chan *types.Log, 512)
		outputQueue := make(chan *types.LogBatch, 1)

		cPath := cursorPath(s.cursorDirectory, target.Matches, target.Database, target.Table)
		tailer := &tailer{
			matches:        target.Matches,
			database:       target.Database,
			table:          target.Table,
			journalFields:  target.JournalFields,
			cursorFilePath: cPath,
			logLineParsers: target.LogLineParsers,
			batchQueue:     batchQueue,

			streamPartials: make(map[string]string),
		}

		s.wg.Add(1)
		go func() {
			defer s.wg.Done()

			tailer.ReadFromJournal(ctx)
		}()

		batchConfig := engine.BatchConfig{
			MaxBatchSize: 1000,
			MaxBatchWait: 1 * time.Second,
			InputQueue:   batchQueue,
			OutputQueue:  outputQueue,
			AckGenerator: ackGenerator,
		}
		s.wg.Add(1)
		go func() {
			defer s.wg.Done()
			engine.BatchLogs(ctx, batchConfig)
		}()

		worker := s.workerCreator("journal", outputQueue)
		s.wg.Add(1)
		go func() {
			defer s.wg.Done()
			worker.Run()
		}()

		tailers = append(tailers, tailer)
	}
	s.tailers = tailers

	return nil
}