in collector/logs/sources/journal/journal_linux.go [53:122]
func (s *Source) Open(ctx context.Context) error {
ctx, closeFn := context.WithCancel(ctx)
s.closeFn = closeFn
ackGenerator := func(*types.Log) func() { return func() {} }
if s.cursorDirectory != "" {
ackGenerator = func(log *types.Log) func() {
return func() {
cursorFilePath := types.StringOrEmpty(log.GetAttributeValue(journald_cursor_filename_attribute))
cursorValue := types.StringOrEmpty(log.GetAttributeValue(journald_cursor_attribute))
if cursorFilePath == "" || cursorValue == "" {
return
}
writeCursor(cursorFilePath, cursorValue)
}
}
}
tailers := make([]*tailer, 0, len(s.targets))
for _, target := range s.targets {
logger.Info("Opening journal source", "filters", target.Matches, "database", target.Database, "table", target.Table)
batchQueue := make(chan *types.Log, 512)
outputQueue := make(chan *types.LogBatch, 1)
cPath := cursorPath(s.cursorDirectory, target.Matches, target.Database, target.Table)
tailer := &tailer{
matches: target.Matches,
database: target.Database,
table: target.Table,
journalFields: target.JournalFields,
cursorFilePath: cPath,
logLineParsers: target.LogLineParsers,
batchQueue: batchQueue,
streamPartials: make(map[string]string),
}
s.wg.Add(1)
go func() {
defer s.wg.Done()
tailer.ReadFromJournal(ctx)
}()
batchConfig := engine.BatchConfig{
MaxBatchSize: 1000,
MaxBatchWait: 1 * time.Second,
InputQueue: batchQueue,
OutputQueue: outputQueue,
AckGenerator: ackGenerator,
}
s.wg.Add(1)
go func() {
defer s.wg.Done()
engine.BatchLogs(ctx, batchConfig)
}()
worker := s.workerCreator("journal", outputQueue)
s.wg.Add(1)
go func() {
defer s.wg.Done()
worker.Run()
}()
tailers = append(tailers, tailer)
}
s.tailers = tailers
return nil
}