in collector/logs/sources/journal/tailer_linux.go [59:153]
func (t *tailer) ReadFromJournal(ctx context.Context) {
t.logger = logger.Logger().With(
slog.String("source", "journal"),
slog.String("database", t.database),
slog.String("table", t.table),
slog.Any("matches", t.matches),
)
// Must lock this goroutine (and lifecycle of sdjournal.Journal object which contains the sd_journal pointer) to the underlying OS thread.
// man 3 sd_journal under "NOTES"
// "given sd_journal pointer may only be used from one specific thread at all times (and it has to be the very same one during the entire lifetime of the object), but multiple, independent threads may use multiple, independent objects safely"
runtime.LockOSThread()
defer runtime.UnlockOSThread()
reader, err := t.openJournal(journalOpenModeStartup)
if err != nil {
t.logger.Error("Failed to open journal tailer. Exiting.", "err", err)
return
}
for {
select {
case <-ctx.Done():
reader.Close()
return
default:
}
ret, err := reader.Next()
if err != nil {
t.logger.Error("Failed to advance in journal", "err", err)
reader, err = t.recoverJournal(reader)
if err != nil {
return // Recovery failed. Exit.
}
continue
}
if ret == 0 {
// Wait for entries
if err := t.waitForNewJournalEntries(ctx, reader); err != nil {
t.logger.Error("Failed to wait for new journal entries", "err", err)
reader, err = t.recoverJournal(reader)
if err != nil {
return // Recovery failed. Exit.
}
continue
}
continue
}
entry, err := reader.GetEntry()
if err != nil {
t.logger.Error("Failed to get journal entry", "err", err)
reader, err = t.recoverJournal(reader)
if err != nil {
return // Recovery failed. Exit.
}
continue
}
message, isPartial := t.combinePartialMessages(entry)
if isPartial {
// We are waiting for more messages to combine
continue
}
log := types.LogPool.Get(1).(*types.Log)
log.Reset()
log.SetTimestamp(uint64(entry.RealtimeTimestamp) * 1000) // microseconds -> nanoseconds
log.SetObservedTimestamp(uint64(time.Now().UnixNano()))
log.SetAttributeValue(types.AttributeDatabaseName, t.database)
log.SetAttributeValue(types.AttributeTableName, t.table)
parser.ExecuteParsers(t.logLineParsers, log, message, "journald")
// Write after parsing to ensure these values are always set to values we need for acking.
log.SetAttributeValue(journald_cursor_attribute, entry.Cursor)
log.SetAttributeValue(journald_cursor_filename_attribute, t.cursorFilePath)
for _, field := range t.journalFields {
if value, ok := entry.Fields[field]; ok {
log.SetResourceValue(field, value)
}
}
t.batchQueue <- log
}
}