filebeat/input/journald/pkg/journalctl/reader.go (235 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//go:build linux
package journalctl
import (
"encoding/json"
"errors"
"fmt"
"strconv"
"time"
"github.com/elastic/beats/v7/filebeat/input/journald/pkg/journalfield"
input "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/common/backoff"
"github.com/elastic/elastic-agent-libs/logp"
)
// LocalSystemJournalID is the ID of the local system journal.
const localSystemJournalID = "LOCAL_SYSTEM_JOURNAL"
// sinceTimeFormat is a time formatting string for the --since flag passed
// to journalctl, it follows a pattern accepted by multiple versions of
// Systemd/Journald.
const sinceTimeFormat = "2006-01-02 15:04:05.999999999"
// ErrCancelled indicates the read was cancelled
var ErrCancelled = errors.New("cancelled")
var ErrRestarting = errors.New("restarting journalctl")
// JournalEntry holds all fields of a journal entry plus cursor and timestamps
type JournalEntry struct {
Fields map[string]any
Cursor string
RealtimeTimestamp uint64
MonotonicTimestamp uint64
}
// JctlFactory is a function that returns an instance of journalctl ready to use.
// It exists to allow testing
type JctlFactory func(canceller input.Canceler, logger *logp.Logger, binary string, args ...string) (Jctl, error)
// Jctl abstracts the call to journalctl, it exists only for testing purposes
//
//go:generate moq --fmt gofmt -out jctlmock_test.go . Jctl
type Jctl interface {
// Next returns the next journal entry. If there is no entry available
// next will block until there is an entry or cancel is cancelled.
//
// If cancel is cancelled, Next returns a zero value JournalEntry
// and ErrCancelled.
//
// If finished is true, then journalctl returned all messages
// and exited successfully
Next(input.Canceler) (data []byte, finished bool, err error)
Kill() error
}
type readerState uint8
const (
readingOldEntriesState readerState = iota
followingState
)
// Reader reads entries from journald by calling `jouranlctl`
// and reading its output.
//
// We call `journalctl` because it proved to be the most resilient way of
// reading journal entries. We have tried to use
// `github.com/coreos/go-systemd/v22/sdjournal`, however due to a bug in
// libsystemd (https://github.com/systemd/systemd/pull/29456) Filebeat
// would crash during journal rotation on high throughput systems.
//
// More details can be found in the PR introducing this feature and related
// issues. PR: https://github.com/elastic/beats/pull/40061.
type Reader struct {
// logger is the logger for the reader
logger *logp.Logger
// jctlLogger is the logger for the code controlling
// the journalctl process
jctlLogger *logp.Logger
// args are arguments for journalctl that never change,
// like the message filters, format, etc
args []string
// firstRunArgs are the arguments used in the first call to
// journalctl that will be replaced by the cursor argument
// once data has been ingested
firstRunArgs []string
// cursor is the jornalctl cursor, it is also stored in Filebeat's registry
cursor string
canceler input.Canceler
jctl Jctl
jctlFactory JctlFactory
backoff backoff.Backoff
state readerState
}
// handleSeekAndCursor returns the correct arguments for seek and cursor.
// If there is a cursor, only the cursor is used, seek is ignored.
// If there is no cursor, then seek is used
// The bool parameter indicates whether there might be messages from
// the previous boots
func handleSeekAndCursor(mode SeekMode, since time.Duration, cursor string) ([]string, bool) {
if cursor != "" {
return []string{"--after-cursor", cursor}, true
}
switch mode {
case SeekSince:
return []string{"--since", time.Now().Add(since).Format(sinceTimeFormat)}, true
case SeekTail:
return []string{"--since", "now"}, false
case SeekHead:
return []string{"--no-tail"}, true
default:
// That should never happen
return []string{}, false
}
}
// New instantiates and starts a reader for journald logs.
//
// The Reader starts a `journalctl` process with JSON output to read the journal
// entries. Units and syslog identifiers are passed using the corresponding CLI
// flags, matchers are passed directly to `journalctl` then transports are added
// as matchers using `_TRANSPORTS` key.
//
// `mode` defines the 'seek mode'. It indicates whether the journal should be
// read from the tail, head or starting from the cursor. If a cursor is passed,
// then the seek mode is ignored.
//
// To start reading from a relative time, use mode: SeekSince and since should
// be a time.Duration relative to the current time to start reading the
// journald.
//
// File is the journal file to be read, for the system journal use the string
// `LOCAL_SYSTEM_JOURNAL`.
//
// It's the caller's responsibility to call `Close` on the reader to stop
// the `journalctl` process.
//
// If `canceler` is cancelled, the reading goroutine is stopped and subsequent
// calls to `Next` will return an error.
func New(
logger *logp.Logger,
canceler input.Canceler,
units []string,
syslogIdentifiers []string,
transports []string,
matchers journalfield.IncludeMatches,
facilities []int,
mode SeekMode,
cursor string,
since time.Duration,
file string,
newJctl JctlFactory,
) (*Reader, error) {
logger = logger.Named("reader")
args := []string{"--utc", "--output=json", "--no-pager"}
if file != "" && file != localSystemJournalID {
args = append(args, "--file", file)
}
for _, u := range units {
args = append(args, "--unit", u)
}
for _, i := range syslogIdentifiers {
args = append(args, "--identifier", i)
}
for _, m := range matchers.Matches {
args = append(args, m.String())
}
for _, m := range transports {
args = append(args, fmt.Sprintf("_TRANSPORT=%s", m))
}
for _, facility := range facilities {
args = append(args, "--facility", fmt.Sprintf("%d", facility))
}
firstRunArgs, prevBoots := handleSeekAndCursor(mode, since, cursor)
state := readingOldEntriesState // Initial state
if !prevBoots {
state = followingState
}
r := Reader{
logger: logger,
jctlLogger: logger.Named("journalctl-runner"),
args: args,
firstRunArgs: firstRunArgs,
state: state,
cursor: cursor,
canceler: canceler,
jctlFactory: newJctl,
backoff: backoff.NewExpBackoff(canceler.Done(), 100*time.Millisecond, 2*time.Second),
}
if err := r.newJctl(firstRunArgs...); err != nil {
return &Reader{}, err
}
return &r, nil
}
func (r *Reader) newJctl(extraArgs ...string) error {
args := append(r.args, extraArgs...)
jctl, err := r.jctlFactory(r.canceler, r.jctlLogger, "journalctl", args...)
r.jctl = jctl
return err
}
// Close stops the `journalctl` process and waits for all
// goroutines to return, the canceller passed to `New` should
// be cancelled before `Close` is called
func (r *Reader) Close() error {
r.logger.Infof("shutting down journalctl, waiting up to: %s", time.Minute)
if err := r.jctl.Kill(); err != nil {
return fmt.Errorf("error stopping journalctl: %w", err)
}
return nil
}
// next reads the next entry from journalctl. It handles any errors from
// journalctl restarting it as necessary with a backoff strategy. It either
// returns a valid journald entry or ErrCancelled when the input is cancelled.
func (r *Reader) next(cancel input.Canceler) ([]byte, error) {
msg, finished, err := r.jctl.Next(cancel)
// Check if the input has been cancelled
select {
case <-cancel.Done():
// The caller is responsible for calling Reader.Close to terminate
// journalctl. Cancelling this canceller only means this Next call was
// cancelled. Because the input has been cancelled, we ignore the message
// and any error it might have returned.
return nil, ErrCancelled
default:
// Three options:
// - Journalctl finished reading messages from previous boots
// successfully, restart it with --follow flag.
// - Error, journalctl exited with an error, restart it in the same
// mode it was running.
// - No error, skip the default block and go parse the message
var extraArgs []string
var restart bool
// First of all: handle the error, if any
if err != nil {
r.logger.Warnf("reader error: '%s', restarting...", err)
restart = true
if r.cursor == "" && r.state == readingOldEntriesState {
// Corner case: journalctl exited with an error before reading the
// 1st message. This means we don't have a cursor and need to restart
// it with the initial arguments.
extraArgs = append(extraArgs, r.firstRunArgs...)
} else if r.cursor != "" {
// There is a cursor, so just append it to our arguments
extraArgs = append(extraArgs, "--after-cursor", r.cursor)
// Last, but not least, add "--follow" if we're in following mode
if r.state == followingState {
extraArgs = append(extraArgs, "--follow")
}
}
// Handle backoff
//
// If the last restart (if any) was more than 5s ago,
// recreate the backoff and do not wait.
// We recreate the backoff so r.backoff.Last().IsZero()
// will return true next time it's called making us to
// wait in case jouranlctl crashes in less than 5s.
if !r.backoff.Last().IsZero() && time.Since(r.backoff.Last()) > 5*time.Second {
r.backoff = backoff.NewExpBackoff(cancel.Done(), 100*time.Millisecond, 2*time.Second)
} else {
r.backoff.Wait()
}
}
// If journalctl finished reading the messages from previous boots
// and exited successfully
if finished {
restart = true
extraArgs = append(extraArgs, "--follow")
if r.cursor != "" {
// If there is a cursor, only use the cursor and the follow argument
extraArgs = append(extraArgs, "--after-cursor", r.cursor)
} else {
// If there is no cursor, it means the first successfully run
// did not return any event, so we have to restart with the
// --follow and all the initial args.
extraArgs = append(extraArgs, r.firstRunArgs...)
}
r.state = followingState
r.logger.Info("finished reading journal entries from all boots, restarting journalctl with follow flag")
}
// Restart journalctl if needed
if restart {
if err := r.newJctl(extraArgs...); err != nil {
// If we cannot restart journalct, there is nothing we can do.
return nil, fmt.Errorf("cannot restart journalctl: %w", err)
}
// Return an empty message and wait for the caller to call us again
return nil, ErrRestarting
}
}
return msg, nil
}
// Next returns the next journal entry. If there is no entry available
// next will block until there is an entry or cancel is cancelled.
//
// If cancel is cancelled, Next returns a zero value JournalEntry
// and ErrCancelled.
func (r *Reader) Next(cancel input.Canceler) (JournalEntry, error) {
// r.next returns ErrRestarting when journalctl is restarting,
// this happens in two situations:
// - When the reader first starts, it runs journalctl without the follow
// flat to read messages from all previous boots, journalctl exits once
// all messages are read.
// - journalctl exited unexpectedly and was restarted.
// On both cases Readr.Next must block until we have a valid journal entry
// or the input is cancelled.
for {
msg, err := r.next(cancel)
if err != nil {
if errors.Is(err, ErrRestarting) {
continue
}
return JournalEntry{}, err
}
return r.handleMessage(msg)
}
}
func (r *Reader) handleMessage(msg []byte) (JournalEntry, error) {
fields := map[string]any{}
if err := json.Unmarshal(msg, &fields); err != nil {
r.logger.Error("journal event cannot be parsed as map[string]any, " +
"look at the events log file for the raw journal event")
// Log raw data to events log file
msg := fmt.Sprintf("data cannot be parsed as map[string]any. Data: '%s'",
string(msg))
r.logger.Errorw(
msg,
"error.message", err.Error(),
logp.TypeKey, logp.EventType)
return JournalEntry{}, fmt.Errorf("cannot decode Journald JSON: %w", err)
}
ts, isString := fields["__REALTIME_TIMESTAMP"].(string)
if !isString {
return JournalEntry{},
fmt.Errorf("'__REALTIME_TIMESTAMP': '%[1]v', type %[1]T is not a string",
fields["__REALTIME_TIMESTAMP"])
}
unixTS, err := strconv.ParseUint(ts, 10, 64)
if err != nil {
return JournalEntry{},
fmt.Errorf("could not convert '__REALTIME_TIMESTAMP' to uint64: %w",
err)
}
monotomicTs, isString := fields["__MONOTONIC_TIMESTAMP"].(string)
if !isString {
return JournalEntry{},
fmt.Errorf("'__MONOTONIC_TIMESTAMP': '%[1]v', type %[1]T is not a string",
fields["__MONOTONIC_TIMESTAMP"])
}
monotonicTSInt, err := strconv.ParseUint(monotomicTs, 10, 64)
if err != nil {
return JournalEntry{},
fmt.Errorf("could not convert '__MONOTONIC_TIMESTAMP' to uint64: %w",
err)
}
cursor, isString := fields["__CURSOR"].(string)
if !isString {
return JournalEntry{},
fmt.Errorf("'_CURSOR': '%[1]v', type %[1]T is not a string",
fields["_CURSOR"])
}
// Update our cursor so we can restart journalctl if needed
r.cursor = cursor
return JournalEntry{
Fields: fields,
RealtimeTimestamp: unixTS,
Cursor: cursor,
MonotonicTimestamp: monotonicTSInt,
}, nil
}