filebeat/input/log/harvester.go (470 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// Package log harvests different inputs for new information. Currently
// two harvester types exist:
//
// - log
//
// - stdin
//
// The log harvester reads a file line by line. In case the end of a file is found
// with an incomplete line, the line pointer stays at the beginning of the incomplete
// line. As soon as the line is completed, it is read and returned.
//
// The stdin harvesters reads data from stdin.
package log
import (
"errors"
"fmt"
"io"
"os"
"sync"
"time"
"github.com/gofrs/uuid/v5"
"golang.org/x/text/transform"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
file_helper "github.com/elastic/beats/v7/libbeat/common/file"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-libs/monitoring"
"github.com/elastic/beats/v7/filebeat/channel"
"github.com/elastic/beats/v7/filebeat/harvester"
"github.com/elastic/beats/v7/filebeat/input/file"
"github.com/elastic/beats/v7/libbeat/reader"
"github.com/elastic/beats/v7/libbeat/reader/debug"
"github.com/elastic/beats/v7/libbeat/reader/multiline"
"github.com/elastic/beats/v7/libbeat/reader/readfile"
"github.com/elastic/beats/v7/libbeat/reader/readfile/encoding"
"github.com/elastic/beats/v7/libbeat/reader/readjson"
conf "github.com/elastic/elastic-agent-libs/config"
)
var (
harvesterMetrics = monitoring.Default.NewRegistry("filebeat.harvester")
filesMetrics = monitoring.GetNamespace("dataset").GetRegistry()
harvesterStarted = monitoring.NewInt(harvesterMetrics, "started")
harvesterClosed = monitoring.NewInt(harvesterMetrics, "closed")
harvesterRunning = monitoring.NewInt(harvesterMetrics, "running")
harvesterOpenFiles = monitoring.NewInt(harvesterMetrics, "open_files")
ErrFileTruncate = errors.New("detected file being truncated")
ErrRenamed = errors.New("file was renamed")
ErrRemoved = errors.New("file was removed")
ErrInactive = errors.New("file inactive")
ErrClosed = errors.New("reader closed")
)
// OutletFactory provides an outlet for the harvester
type OutletFactory func() channel.Outleter
// Harvester contains all harvester related data
type Harvester struct {
logger *logp.Logger
id uuid.UUID
config config
source harvester.Source // the source being watched
// shutdown handling
done chan struct{}
doneWg *sync.WaitGroup
stopOnce sync.Once
stopWg *sync.WaitGroup
stopLock sync.Mutex
// internal harvester state
state file.State
states *file.States
log *Log
// file reader pipeline
reader reader.Reader
encodingFactory encoding.EncodingFactory
encoding encoding.Encoding
// event/state publishing
outletFactory OutletFactory
publishState func(file.State) bool
metrics *harvesterProgressMetrics
onTerminate func()
}
// stores the metrics of the harvester
type harvesterProgressMetrics struct {
metricsRegistry *monitoring.Registry
filename *monitoring.String
started *monitoring.String
lastPublished *monitoring.Timestamp
lastPublishedEventTimestamp *monitoring.Timestamp
currentSize *monitoring.Int
readOffset *monitoring.Int
}
// NewHarvester creates a new harvester
func NewHarvester(
logger *logp.Logger,
config *conf.C,
state file.State,
states *file.States,
publishState func(file.State) bool,
outletFactory OutletFactory,
) (*Harvester, error) {
id, err := uuid.NewV4()
if err != nil {
return nil, err
}
logger = logger.Named("harvester").With("harvester_id", id)
h := &Harvester{
logger: logger,
config: defaultConfig(),
state: state,
states: states,
publishState: publishState,
done: make(chan struct{}),
stopWg: &sync.WaitGroup{},
doneWg: &sync.WaitGroup{},
id: id,
outletFactory: outletFactory,
}
if err := config.Unpack(&h.config); err != nil {
return nil, err
}
encodingFactory, ok := encoding.FindEncoding(h.config.Encoding)
if !ok || encodingFactory == nil {
return nil, fmt.Errorf("unknown encoding('%v')", h.config.Encoding)
}
h.encodingFactory = encodingFactory
// Add ttl if clean_inactive is set
if h.config.CleanInactive > 0 {
h.state.TTL = h.config.CleanInactive
}
// Add outlet signal so harvester can also stop itself
return h, nil
}
// open does open the file given under h.Path and assigns the file handler to h.log
func (h *Harvester) open() error {
switch h.config.Type {
case harvester.StdinType:
return h.openStdin()
case harvester.LogType, harvester.DockerType, harvester.ContainerType:
return h.openFile()
default:
return fmt.Errorf("Invalid harvester type: %+v", h.config)
}
}
// ID returns the unique harvester identifier
func (h *Harvester) ID() uuid.UUID {
return h.id
}
// Setup opens the file handler and creates the reader for the harvester
func (h *Harvester) Setup() error {
err := h.open()
if err != nil {
return fmt.Errorf("Harvester setup failed. Unexpected file opening error: %w", err)
}
h.reader, err = h.newLogFileReader()
if err != nil {
if h.source != nil {
h.source.Close()
}
return fmt.Errorf("Harvester setup failed. Unexpected encoding line reader error: %w", err)
}
h.metrics = newHarvesterProgressMetrics(h.id.String())
h.metrics.filename.Set(h.source.Name())
h.metrics.started.Set(common.Time(time.Now()).String())
h.metrics.readOffset.Set(h.state.Offset)
err = h.updateCurrentSize()
if err != nil {
return err
}
h.logger.Debugf("Harvester setup successful. Line terminator: %d", h.config.LineTerminator)
return nil
}
func newHarvesterProgressMetrics(id string) *harvesterProgressMetrics {
r := filesMetrics.NewRegistry(id)
return &harvesterProgressMetrics{
metricsRegistry: r,
filename: monitoring.NewString(r, "name"),
started: monitoring.NewString(r, "start_time"),
lastPublished: monitoring.NewTimestamp(r, "last_event_published_time"),
lastPublishedEventTimestamp: monitoring.NewTimestamp(r, "last_event_timestamp"),
currentSize: monitoring.NewInt(r, "size"),
readOffset: monitoring.NewInt(r, "read_offset"),
}
}
func (h *Harvester) updateCurrentSize() error {
fInfo, err := h.source.Stat()
if err != nil {
return err
}
h.metrics.currentSize.Set(fInfo.Size())
return nil
}
// Run start the harvester and reads files line by line and sends events to the defined output
func (h *Harvester) Run() error {
logger := h.logger
// Allow for some cleanup on termination
if h.onTerminate != nil {
defer h.onTerminate()
}
outlet := channel.CloseOnSignal(h.outletFactory(), h.done)
forwarder := harvester.NewForwarder(outlet)
// This is to make sure a harvester is not started anymore if stop was already
// called before the harvester was started. The waitgroup is not incremented afterwards
// as otherwise it could happened that between checking for the close channel and incrementing
// the waitgroup, the harvester could be stopped.
// Here stopLock is used to prevent a data race where stopWg.Add(1) below is called
// while stopWg.Wait() is executing in a different goroutine, which is forbidden
// according to sync.WaitGroup docs.
h.stopLock.Lock()
h.stopWg.Add(1)
h.stopLock.Unlock()
select {
case <-h.done:
h.stopWg.Done()
return nil
default:
}
defer func() {
// Channel to stop internal harvester routines
h.stop()
// Makes sure file is properly closed when the harvester is stopped
h.cleanup()
harvesterRunning.Add(-1)
// Marks harvester stopping completed
h.stopWg.Done()
}()
harvesterStarted.Add(1)
harvesterRunning.Add(1)
// Closes reader after timeout or when done channel is closed
// This routine is also responsible to properly stop the reader
go func(source string) {
closeTimeout := make(<-chan time.Time)
// starts close_timeout timer
if h.config.CloseTimeout > 0 {
closeTimeout = time.After(h.config.CloseTimeout)
}
select {
// Applies when timeout is reached
case <-closeTimeout:
logger.Infof("Closing harvester because close_timeout was reached: %s", source)
// Required when reader loop returns and reader finished
case <-h.done:
}
h.stop()
err := h.reader.Close()
if err != nil {
logger.Errorf("Failed to stop harvester for file: %v", err)
}
}(h.state.Source)
logger.Infof("Harvester started for paths: %v", h.config.Paths)
h.doneWg.Add(1)
go func() {
h.monitorFileSize()
h.doneWg.Done()
}()
for {
select {
case <-h.done:
return nil
default:
}
message, err := h.reader.Next()
if err != nil {
switch {
case errors.Is(err, ErrFileTruncate):
logger.Info("File was truncated. Begin reading file from offset 0.")
h.state.Offset = 0
filesTruncated.Add(1)
case errors.Is(err, ErrRemoved):
logger.Info("File was removed. Closing because close_removed is enabled.")
case errors.Is(err, ErrRenamed):
logger.Info("File was renamed. Closing because close_renamed is enabled.")
case errors.Is(err, ErrClosed):
logger.Info("Reader was closed. Closing.")
case errors.Is(err, io.EOF):
logger.Info("End of file reached. Closing because close_eof is enabled.")
case errors.Is(err, ErrInactive):
logger.Infof("File is inactive. Closing because close_inactive of %v reached.", h.config.CloseInactive)
default:
logger.Errorf("Read line error: %v", err)
}
return nil
}
// Get copy of state to work on
// This is important in case sending is not successful so on shutdown
// the old offset is reported
state := h.getState()
startingOffset := state.Offset
state.Offset += int64(message.Bytes)
// Stop harvester in case of an error
if !h.onMessage(forwarder, state, message, startingOffset) {
return nil
}
// Update state of harvester as successfully sent
h.state = state
// Update metics of harvester as event was sent
h.metrics.readOffset.Set(state.Offset)
h.metrics.lastPublished.Set(time.Now())
h.metrics.lastPublishedEventTimestamp.Set(message.Ts)
}
}
func (h *Harvester) monitorFileSize() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-h.done:
return
case <-ticker.C:
err := h.updateCurrentSize()
if err != nil {
h.logger.Errorf("Error updating file size: %v", err)
}
}
}
}
// stop is intended for internal use and closed the done channel to stop execution
func (h *Harvester) stop() {
h.stopOnce.Do(func() {
close(h.done)
// Wait for goroutines monitoring h.done to terminate before closing source.
h.doneWg.Wait()
filesMetrics.Remove(h.id.String())
})
}
// Stop stops harvester and waits for completion
func (h *Harvester) Stop() {
h.stop()
// Prevent stopWg.Wait() to be called at the same time as stopWg.Add(1)
h.stopLock.Lock()
h.stopWg.Wait()
h.stopLock.Unlock()
}
// onMessage processes a new message read from the reader.
// This results in a state update and possibly an event would be send.
// A state update first updates the in memory state held by the prospector,
// and finally sends the file.State indirectly to the registrar.
// The events Private field is used to forward the file state update.
//
// onMessage returns 'false' if it was interrupted in the process of sending the event.
// This normally signals a harvester shutdown.
func (h *Harvester) onMessage(
forwarder *harvester.Forwarder,
state file.State,
message reader.Message,
messageOffset int64,
) bool {
if h.source.HasState() {
h.states.Update(state)
}
text := string(message.Content)
if message.IsEmpty() || !h.shouldExportLine(text) {
// No data or event is filtered out -> send empty event with state update
// only. The call can fail on filebeat shutdown.
// The event will be filtered out, but forwarded to the registry as is.
err := forwarder.Send(beat.Event{Private: state})
return err == nil
}
fields := mapstr.M{
"log": mapstr.M{
"offset": messageOffset, // Offset here is the offset before the starting char.
"file": mapstr.M{
"path": state.Source,
},
},
}
fields.DeepUpdate(message.Fields)
// Check if json fields exist
var jsonFields mapstr.M
if f, ok := fields["json"]; ok {
jsonFields = f.(mapstr.M)
}
var meta mapstr.M
timestamp := message.Ts
if h.config.JSON != nil && len(jsonFields) > 0 {
id, ts := readjson.MergeJSONFields(fields, jsonFields, &text, *h.config.JSON)
if !ts.IsZero() {
// there was a `@timestamp` key in the event, so overwrite
// the resulting timestamp
timestamp = ts
}
if id != "" {
meta = mapstr.M{
"_id": id,
}
}
} else if len(text) != 0 {
fields["message"] = text
}
err := forwarder.Send(beat.Event{
Timestamp: timestamp,
Fields: fields,
Meta: meta,
Private: state,
})
return err == nil
}
// SendStateUpdate send an empty event with the current state to update the registry
// close_timeout does not apply here to make sure a harvester is closed properly. In
// case the output is blocked the harvester will stay open to make sure no new harvester
// is started. As soon as the output becomes available again, the finished state is written
// and processing can continue.
func (h *Harvester) SendStateUpdate() {
if !h.source.HasState() {
return
}
h.publishState(h.state)
h.logger.Debugf("Update state (offset: %v).", h.state.Offset)
h.states.Update(h.state)
}
// shouldExportLine decides if the line is exported or not based on
// the include_lines and exclude_lines options.
func (h *Harvester) shouldExportLine(line string) bool {
if len(h.config.IncludeLines) > 0 {
if !harvester.MatchAny(h.config.IncludeLines, line) {
// drop line
h.logger.Debugf("Drop line as it does not match any of the include patterns %s", line)
return false
}
}
if len(h.config.ExcludeLines) > 0 {
if harvester.MatchAny(h.config.ExcludeLines, line) {
// drop line
h.logger.Debugf("Drop line as it does match one of the exclude patterns%s", line)
return false
}
}
return true
}
// openFile opens a file and checks for the encoding. In case the encoding cannot be detected
// or the file cannot be opened because for example of failing read permissions, an error
// is returned and the harvester is closed. The file will be picked up again the next time
// the file system is scanned
func (h *Harvester) openFile() error {
fi, err := os.Stat(h.state.Source)
if err != nil {
return fmt.Errorf("failed to stat source file %s: %w", h.state.Source, err)
}
if fi.Mode()&os.ModeNamedPipe != 0 {
return fmt.Errorf("failed to open file %s, named pipes are not supported", h.state.Source)
}
f, err := file_helper.ReadOpen(h.state.Source)
if err != nil {
return fmt.Errorf("Failed opening %s: %w", h.state.Source, err)
}
harvesterOpenFiles.Add(1)
// Makes sure file handler is also closed on errors
err = h.validateFile(f)
if err != nil {
f.Close()
harvesterOpenFiles.Add(-1)
return err
}
h.source = File{File: f}
return nil
}
func (h *Harvester) validateFile(f *os.File) error {
logger := h.logger
info, err := f.Stat()
if err != nil {
return fmt.Errorf("Failed getting stats for file %s: %w", h.state.Source, err)
}
if !info.Mode().IsRegular() {
return fmt.Errorf("Tried to open non regular file: %q %s", info.Mode(), info.Name())
}
// Compares the stat of the opened file to the state given by the input. Abort if not match.
if !os.SameFile(h.state.Fileinfo, info) {
return errors.New("file info is not identical with opened file. Aborting harvesting and retrying file later again")
}
h.encoding, err = h.encodingFactory(f)
if err != nil {
if errors.Is(err, transform.ErrShortSrc) {
logger.Infof("Initialising encoding for '%v' failed due to file being too short", f)
} else {
logger.Errorf("Initialising encoding for '%v' failed: %v", f, err)
}
return err
}
// get file offset. Only update offset if no error
offset, err := h.initFileOffset(f)
if err != nil {
return err
}
logger.Debugf("Setting offset: %d ", offset)
h.state.Offset = offset
return nil
}
func (h *Harvester) initFileOffset(file *os.File) (int64, error) {
// continue from last known offset
if h.state.Offset > 0 {
h.logger.Debugf("Set previous offset: %d ", h.state.Offset)
return file.Seek(h.state.Offset, io.SeekStart)
}
// get offset from file in case of encoding factory was required to read some data.
h.logger.Debug("Setting offset to: 0")
return file.Seek(0, io.SeekCurrent)
}
// getState returns an updated copy of the harvester state
func (h *Harvester) getState() file.State {
if !h.source.HasState() {
return file.State{}
}
state := h.state
// refreshes the values in State with the values from the harvester itself
state.FileStateOS = file_helper.GetOSState(h.state.Fileinfo)
return state
}
func (h *Harvester) cleanup() {
// Mark harvester as finished
h.state.Finished = true
h.logger.Debugf("Stopping harvester.")
defer h.logger.Debugf("harvester cleanup finished.")
// Make sure file is closed as soon as harvester exits
// If file was never opened, it can't be closed
if h.source != nil {
// close file handler
h.source.Close()
h.logger.Debugf("Closing file")
harvesterOpenFiles.Add(-1)
// On completion, push offset so we can continue where we left off if we relaunch on the same file
// Only send offset if file object was created successfully
h.SendStateUpdate()
} else {
h.logger.Warn("Stopping harvester, NOT closing file as file info not available.")
}
harvesterClosed.Add(1)
}
// newLogFileReader creates a new reader to read log files
//
// It creates a chain of readers which looks as following:
//
// limit -> (multiline -> timeout) -> strip_newline -> json -> encode -> line -> log_file
//
// Each reader on the left, contains the reader on the right and calls `Next()` to fetch more data.
// At the base of all readers the the log_file reader. That means in the data is flowing in the opposite direction:
//
// log_file -> line -> encode -> json -> strip_newline -> (timeout -> multiline) -> limit
//
// log_file implements io.Reader interface and encode reader is an adapter for io.Reader to
// reader.Reader also handling file encodings. All other readers implement reader.Reader
func (h *Harvester) newLogFileReader() (reader.Reader, error) {
var r reader.Reader
var err error
h.logger.Debugf("newLogFileReader with config.MaxBytes: %d", h.config.MaxBytes)
// TODO: NewLineReader uses additional buffering to deal with encoding and testing
// for new lines in input stream. Simple 8-bit based encodings, or plain
// don't require 'complicated' logic.
h.log, err = NewLog(h.logger, h.source, h.config.LogConfig)
if err != nil {
return nil, err
}
reader, err := debug.AppendReaders(h.log)
if err != nil {
return nil, err
}
// Configure MaxBytes limit for EncodeReader as multiplied by 4
// for the worst case scenario where incoming UTF32 charchers are decoded to the single byte UTF-8 characters.
// This limit serves primarily to avoid memory bload or potential OOM with expectedly long lines in the file.
// The further size limiting is performed by LimitReader at the end of the readers pipeline as needed.
encReaderMaxBytes := h.config.MaxBytes * 4
r, err = readfile.NewEncodeReader(reader, readfile.Config{
Codec: h.encoding,
BufferSize: h.config.BufferSize,
Terminator: h.config.LineTerminator,
MaxBytes: encReaderMaxBytes,
})
if err != nil {
return nil, err
}
if h.config.DockerJSON != nil {
// Docker json-file format, add custom parsing to the pipeline
r = readjson.New(r, h.config.DockerJSON.Stream, h.config.DockerJSON.Partial, h.config.DockerJSON.Format, h.config.DockerJSON.CRIFlags)
}
if h.config.JSON != nil {
r = readjson.NewJSONReader(r, h.config.JSON)
}
r = readfile.NewStripNewline(r, h.config.LineTerminator)
if h.config.Multiline != nil {
r, err = multiline.New(r, "\n", h.config.MaxBytes, h.config.Multiline)
if err != nil {
return nil, err
}
}
return readfile.NewLimitReader(r, h.config.MaxBytes), nil
}