plugins/input/journal/input_journal.go (295 lines of code) (raw):
//go:build linux
// +build linux
// Copyright 2017 Marcus Heese
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// This code based on logic from journalctl unit filter. i.e. journalctl -u in
// the systemd source code.
// See: https://github.com/systemd/systemd/blob/master/src/journal/journalctl.c#L1410
// and https://github.com/systemd/systemd/blob/master/src/basic/unit-name.c
package journal
import (
"fmt"
"os"
"strconv"
"sync"
"time"
"github.com/alibaba/ilogtail/pkg/logger"
"github.com/alibaba/ilogtail/pkg/pipeline"
"github.com/coreos/go-systemd/sdjournal"
)
// Named constants for the journal cursor placement positions
const (
SeekPositionCursor = "cursor"
SeekPositionHead = "head"
SeekPositionTail = "tail"
SeekPositionDefault = "none"
)
const (
checkPointKey = "journal_v1"
defaultResetInterval = 60 * 60
)
// SyslogFacilityString is a map containing the textual equivalence of a given facility number
var SyslogFacilityString = map[string]string{
"0": "kernel",
"1": "user",
"2": "mail",
"3": "daemon",
"4": "auth",
"5": "syslog",
"6": "line printer",
"7": "network news",
"8": "uucp",
"9": "clock daemon",
"10": "security/auth",
"11": "ftp",
"12": "ntp",
"13": "log audit",
"14": "log alert",
"15": "clock daemon",
"16": "local0",
"17": "local1",
"18": "local2",
"19": "local3",
"20": "local4",
"21": "local5",
"22": "local6",
"23": "local7",
}
// PriorityConversionMap is a map containing the textual equivalence of a given priority string number
var PriorityConversionMap = map[string]string{
"0": "emergency",
"1": "alert",
"2": "critical",
"3": "error",
"4": "warning",
"5": "notice",
"6": "informational",
"7": "debug",
}
type ServiceJournal struct {
SeekPosition string
CursorFlushPeriodMs int
CursorSeekFallback string
Units []string
Kernel bool
Identifiers []string
JournalPaths []string
MatchPatterns []string
ParseSyslogFacility bool
ParsePriority bool
UseJournalEventTime bool
ResetIntervalSecond int
journal *sdjournal.Journal
lastSaveCPTime time.Time
lastCPCursor string
shutdown chan struct{}
waitGroup sync.WaitGroup
context pipeline.Context
}
func (sj *ServiceJournal) SaveCheckpoint(forceFlag bool) {
if !forceFlag {
nowTime := time.Now()
if nowTime.Sub(sj.lastSaveCPTime).Nanoseconds() < int64(sj.CursorFlushPeriodMs)*1000000 {
return
}
sj.lastSaveCPTime = nowTime
}
if sj.journal == nil {
return
}
if sj.ResetIntervalSecond <= 0 {
sj.ResetIntervalSecond = defaultResetInterval
}
if cursor, err := sj.journal.GetCursor(); err != nil {
logger.Error(sj.context.GetRuntimeContext(), "SAVE_CHECKPOINT_ALARM", "get cursor error", err)
} else if err := sj.context.SaveCheckPoint(checkPointKey, []byte(cursor)); err != nil {
logger.Error(sj.context.GetRuntimeContext(), "SAVE_CHECKPOINT_ALARM", "save checkpoint error", err)
}
}
func (sj *ServiceJournal) LoadCheckpoint() {
if val, ok := sj.context.GetCheckPoint(checkPointKey); ok {
sj.lastCPCursor = string(val)
}
}
func (sj *ServiceJournal) Init(context pipeline.Context) (int, error) {
sj.context = context
return 0, nil
}
func (sj *ServiceJournal) addKernel() error {
if len(sj.Units) > 0 && sj.Kernel {
err := sj.addMatchesForKernel()
if err != nil {
return fmt.Errorf("Adding filter for kernel failed: %v", err)
}
}
return nil
}
func (sj *ServiceJournal) addMatchesForKernel() error {
err := sj.journal.AddMatch("_TRANSPORT=kernel")
if err != nil {
return err
}
return sj.journal.AddDisjunction()
}
// Add syslog identifiers to monitor
func (sj *ServiceJournal) addSyslogIdentifiers() error {
var err error
for _, identifier := range sj.Identifiers {
if err = sj.journal.AddMatch(sdjournal.SD_JOURNAL_FIELD_SYSLOG_IDENTIFIER + "=" + identifier); err != nil {
return fmt.Errorf("Filtering syslog identifier %s failed: %v", identifier, err)
}
if err = sj.journal.AddDisjunction(); err != nil {
return fmt.Errorf("Filtering syslog identifier %s failed: %v", identifier, err)
}
}
return nil
}
func (sj *ServiceJournal) Description() string {
return "journal input plugin for logtail"
}
// Collect takes in an accumulator and adds the metrics that the Input
// gathers. This is called every "interval"
func (sj *ServiceJournal) Collect(pipeline.Collector) error {
return nil
}
func (sj *ServiceJournal) initJournal() error {
var err error
seekToHelper := func(position string, err error) error {
if err == nil {
logger.Infof(sj.context.GetRuntimeContext(), "Seek to [%s] successful", position)
} else {
logger.Warningf(sj.context.GetRuntimeContext(), "JOURNAL_SEEK_ALARM", "Could not seek to %s: %v", position, err)
}
return err
}
// connect to the Systemd Journal
switch len(sj.JournalPaths) {
case 0:
if sj.journal, err = sdjournal.NewJournal(); err != nil {
return err
}
case 1:
fi, errStat := os.Stat(sj.JournalPaths[0])
if errStat != nil {
return errStat
}
if fi.IsDir() {
if sj.journal, err = sdjournal.NewJournalFromDir(sj.JournalPaths[0]); err != nil {
return err
}
} else {
if sj.journal, err = sdjournal.NewJournalFromFiles(sj.JournalPaths...); err != nil {
return err
}
}
default:
if sj.journal, err = sdjournal.NewJournalFromFiles(sj.JournalPaths...); err != nil {
return err
}
}
logger.Info(sj.context.GetRuntimeContext(), "journal", "open success")
// add specific units to monitor if any
if err = sj.addUnits(); err != nil {
return err
}
// add specific patterns to monitor if any
for _, pattern := range sj.MatchPatterns {
err = sj.journal.AddMatch(pattern)
if err == nil {
err = sj.journal.AddDisjunction()
}
if err != nil {
return fmt.Errorf("Filtering pattern %s failed: %v", pattern, err)
}
}
// add kernel logs
if err = sj.addKernel(); err != nil {
return err
}
// add syslog identifiers to monitor if any
if err = sj.addSyslogIdentifiers(); err != nil {
return err
}
sj.LoadCheckpoint()
if len(sj.lastCPCursor) == 0 {
switch sj.SeekPosition {
case SeekPositionHead:
err = seekToHelper(SeekPositionHead, sj.journal.SeekHead())
default:
err = seekToHelper(SeekPositionTail, sj.journal.SeekTail())
}
} else {
_ = seekToHelper(sj.lastCPCursor, sj.journal.SeekCursor(sj.lastCPCursor))
_, err = sj.journal.Next()
if err != nil {
logger.Warning(sj.context.GetRuntimeContext(), "JOURNAL_READ_ALARM", "call next when init error", err)
err = nil
}
}
if err != nil {
return fmt.Errorf("Seeking to a good position in journal failed: %v", err)
}
return nil
}
// Start starts the ServiceInput's service, whatever that may be
func (sj *ServiceJournal) Start(c pipeline.Collector) error {
sj.shutdown = make(chan struct{})
sj.waitGroup.Add(1)
defer sj.waitGroup.Done()
RunLoop:
for {
if err := sj.initJournal(); err != nil {
return err
}
runShutdown := make(chan struct{})
var runWaitGroup sync.WaitGroup
runWaitGroup.Add(1)
go sj.run(c, runShutdown, &runWaitGroup)
t := time.NewTimer(time.Second * time.Duration(sj.ResetIntervalSecond))
select {
case <-t.C:
case <-sj.shutdown:
}
close(runShutdown)
runWaitGroup.Wait()
t.Stop()
select {
case <-sj.shutdown:
break RunLoop
default:
logger.Infof(sj.context.GetRuntimeContext(), "restart journal to release memory, interval: %vs", sj.ResetIntervalSecond)
}
}
return nil
}
// Stop stops the services and closes any necessary channels and connections
func (sj *ServiceJournal) Stop() error {
close(sj.shutdown)
sj.waitGroup.Wait()
return nil
}
func (sj *ServiceJournal) run(c pipeline.Collector, shutdown chan struct{}, wg *sync.WaitGroup) {
defer func() {
sj.SaveCheckpoint(true)
logger.Info(sj.context.GetRuntimeContext(), "journal", "start close")
_ = sj.journal.Close()
logger.Info(sj.context.GetRuntimeContext(), "journal", "close success")
sj.journal = nil
wg.Done()
}()
columns := []string{"_realtime_timestamp_", "_monotonic_timestamp_"}
values := []string{"", ""}
for rawEvent := range sj.Follow(sj.journal, shutdown) {
// type JournalEntry struct {
// Fields map[string]string
// Cursor string
// RealtimeTimestamp uint64
// MonotonicTimestamp uint64
// }
// logger.Debug("on journal event", *rawEvent)
if sj.ParsePriority {
if val, ok := rawEvent.Fields[sdjournal.SD_JOURNAL_FIELD_PRIORITY]; ok {
rawEvent.Fields[sdjournal.SD_JOURNAL_FIELD_PRIORITY] = PriorityConversionMap[val]
}
}
if sj.ParseSyslogFacility {
if val, ok := rawEvent.Fields[sdjournal.SD_JOURNAL_FIELD_SYSLOG_FACILITY]; ok {
rawEvent.Fields[sdjournal.SD_JOURNAL_FIELD_SYSLOG_FACILITY] = SyslogFacilityString[val]
}
}
var eventTime time.Time
if sj.UseJournalEventTime {
eventTime = time.Unix(0, int64(rawEvent.RealtimeTimestamp)*1000)
} else {
eventTime = time.Now()
}
values[0] = strconv.FormatUint(rawEvent.RealtimeTimestamp, 10)
values[1] = strconv.FormatUint(rawEvent.MonotonicTimestamp, 10)
c.AddDataArray(rawEvent.Fields, columns, values, eventTime)
sj.SaveCheckpoint(false)
}
logger.Info(sj.context.GetRuntimeContext(), "service journal sync", "done")
}
func init() {
pipeline.ServiceInputs["service_journal"] = func() pipeline.ServiceInput {
return &ServiceJournal{
SeekPosition: SeekPositionTail,
CursorFlushPeriodMs: 5000,
CursorSeekFallback: SeekPositionTail,
Kernel: true,
ResetIntervalSecond: defaultResetInterval,
}
}
}