filebeat/beater/filebeat.go (419 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package beater
import (
"context"
"flag"
"fmt"
"path/filepath"
"strings"
"sync"
"github.com/elastic/beats/v7/filebeat/channel"
cfg "github.com/elastic/beats/v7/filebeat/config"
"github.com/elastic/beats/v7/filebeat/fileset"
_ "github.com/elastic/beats/v7/filebeat/include"
"github.com/elastic/beats/v7/filebeat/input"
"github.com/elastic/beats/v7/filebeat/input/filestream"
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/filebeat/input/v2/compat"
"github.com/elastic/beats/v7/filebeat/registrar"
"github.com/elastic/beats/v7/libbeat/autodiscover"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/cfgfile"
"github.com/elastic/beats/v7/libbeat/common/cfgwarn"
"github.com/elastic/beats/v7/libbeat/common/reload"
"github.com/elastic/beats/v7/libbeat/esleg/eslegclient"
"github.com/elastic/beats/v7/libbeat/management"
"github.com/elastic/beats/v7/libbeat/monitoring/inputmon"
"github.com/elastic/beats/v7/libbeat/outputs/elasticsearch"
"github.com/elastic/beats/v7/libbeat/publisher/pipetool"
"github.com/elastic/beats/v7/libbeat/statestore"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/monitoring"
"github.com/elastic/go-concert/unison"
// Add filebeat level processors
_ "github.com/elastic/beats/v7/filebeat/processor/add_kubernetes_metadata"
_ "github.com/elastic/beats/v7/libbeat/processors/decode_csv_fields"
// include all filebeat specific autodiscover features
_ "github.com/elastic/beats/v7/filebeat/autodiscover"
)
const pipelinesWarning = "Filebeat is unable to load the ingest pipelines for the configured" +
" modules because the Elasticsearch output is not configured/enabled. If you have" +
" already loaded the ingest pipelines or are using Logstash pipelines, you" +
" can ignore this warning."
var once = flag.Bool("once", false, "Run filebeat only once until all harvesters reach EOF")
// Filebeat is a beater object. Contains all objects needed to run the beat
type Filebeat struct {
config *cfg.Config
moduleRegistry *fileset.ModuleRegistry
pluginFactory PluginFactory
done chan struct{}
stopOnce sync.Once // wraps the Stop() method
pipeline beat.PipelineConnector
logger *logp.Logger
}
type PluginFactory func(beat.Info, *logp.Logger, statestore.States) []v2.Plugin
// New creates a new Filebeat pointer instance.
func New(plugins PluginFactory) beat.Creator {
return func(b *beat.Beat, rawConfig *conf.C) (beat.Beater, error) {
return newBeater(b, plugins, rawConfig)
}
}
func newBeater(b *beat.Beat, plugins PluginFactory, rawConfig *conf.C) (beat.Beater, error) {
config := cfg.DefaultConfig
if err := rawConfig.Unpack(&config); err != nil {
return nil, fmt.Errorf("Error reading config file: %w", err) //nolint:staticcheck //Keep old behavior
}
if err := cfgwarn.CheckRemoved6xSettings(
rawConfig,
"prospectors",
"config.prospectors",
"registry_file",
"registry_file_permissions",
"registry_flush",
); err != nil {
return nil, err
}
enableAllFilesets, _ := b.BeatConfig.Bool("config.modules.enable_all_filesets", -1)
forceEnableModuleFilesets, _ := b.BeatConfig.Bool("config.modules.force_enable_module_filesets", -1)
filesetOverrides := fileset.FilesetOverrides{
EnableAllFilesets: enableAllFilesets,
ForceEnableModuleFilesets: forceEnableModuleFilesets,
}
moduleRegistry, err := fileset.NewModuleRegistry(config.Modules, b.Info, true, filesetOverrides)
if err != nil {
return nil, err
}
moduleInputs, err := moduleRegistry.GetInputConfigs()
if err != nil {
return nil, err
}
if err := config.FetchConfigs(); err != nil {
return nil, err
}
if b.API != nil {
if err = inputmon.AttachHandler(b.API.Router(), b.Info.Monitoring.NamespaceRegistry()); err != nil {
return nil, fmt.Errorf("failed attach inputs api to monitoring endpoint server: %w", err)
}
}
if b.Manager != nil {
b.Manager.RegisterDiagnosticHook("input_metrics", "Metrics from active inputs.",
"input_metrics.json", "application/json", func() []byte {
data, err := inputmon.MetricSnapshotJSON(b.Info.Monitoring.NamespaceRegistry())
if err != nil {
b.Info.Logger.Warnw("Failed to collect input metric snapshot for Agent diagnostics.", "error", err)
return []byte(err.Error())
}
return data
})
b.Manager.RegisterDiagnosticHook(
"registry",
"Filebeat's registry",
"registry.tar.gz",
"application/octet-stream",
gzipRegistry)
}
// Add inputs created by the modules
config.Inputs = append(config.Inputs, moduleInputs...)
enabledInputs := config.ListEnabledInputs()
var haveEnabledInputs bool
if len(enabledInputs) > 0 {
haveEnabledInputs = true
}
if !config.ConfigInput.Enabled() && !config.ConfigModules.Enabled() && !haveEnabledInputs && config.Autodiscover == nil && !b.Manager.Enabled() {
if !b.InSetupCmd {
return nil, fmt.Errorf("no modules or inputs enabled and configuration reloading disabled. What files do you want me to watch?")
}
// in the `setup` command, log this only as a warning
b.Info.Logger.Warn("Setup called, but no modules enabled.")
}
if *once && config.ConfigInput.Enabled() && config.ConfigModules.Enabled() {
return nil, fmt.Errorf("input configs and --once cannot be used together")
}
if config.IsInputEnabled("stdin") && len(enabledInputs) > 1 {
return nil, fmt.Errorf("stdin requires to be run in exclusive mode, configured inputs: %s", strings.Join(enabledInputs, ", "))
}
fb := &Filebeat{
done: make(chan struct{}),
config: &config,
moduleRegistry: moduleRegistry,
pluginFactory: plugins,
logger: b.Info.Logger,
}
err = fb.setupPipelineLoaderCallback(b)
if err != nil {
return nil, err
}
return fb, nil
}
// setupPipelineLoaderCallback sets the callback function for loading pipelines during setup.
func (fb *Filebeat) setupPipelineLoaderCallback(b *beat.Beat) error {
if b.Config.Output.Name() != "elasticsearch" && !b.Manager.Enabled() {
fb.logger.Warn(pipelinesWarning)
return nil
}
overwritePipelines := true
b.OverwritePipelinesCallback = func(esConfig *conf.C) error {
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
esClient, err := eslegclient.NewConnectedClient(ctx, esConfig, "Filebeat")
if err != nil {
return err
}
// When running the subcommand setup, configuration from modules.d directories
// have to be loaded using cfg.Reloader. Otherwise those configurations are skipped.
pipelineLoaderFactory := newPipelineLoaderFactory(ctx, b.Config.Output.Config())
enableAllFilesets, _ := b.BeatConfig.Bool("config.modules.enable_all_filesets", -1)
forceEnableModuleFilesets, _ := b.BeatConfig.Bool("config.modules.force_enable_module_filesets", -1)
filesetOverrides := fileset.FilesetOverrides{
EnableAllFilesets: enableAllFilesets,
ForceEnableModuleFilesets: forceEnableModuleFilesets,
}
modulesFactory := fileset.NewSetupFactory(b.Info, pipelineLoaderFactory, filesetOverrides)
if fb.config.ConfigModules.Enabled() {
if enableAllFilesets {
// All module configs need to be loaded to enable all the filesets
// contained in the modules. The default glob just loads the enabled
// ones. Switching the glob pattern from *.yml to * achieves this.
origPath, _ := fb.config.ConfigModules.String("path", -1)
newPath := strings.TrimSuffix(origPath, ".yml")
_ = fb.config.ConfigModules.SetString("path", -1, newPath)
}
modulesLoader := cfgfile.NewReloader(fb.logger.Named("module.reloader"), fb.pipeline, fb.config.ConfigModules)
modulesLoader.Load(modulesFactory)
}
return fb.moduleRegistry.LoadPipelines(esClient, overwritePipelines)
}
return nil
}
// loadModulesPipelines is called when modules are configured to do the initial
// setup.
func (fb *Filebeat) loadModulesPipelines(b *beat.Beat) error {
if b.Config.Output.Name() != "elasticsearch" {
fb.logger.Warn(pipelinesWarning)
return nil
}
overwritePipelines := fb.config.OverwritePipelines
if b.InSetupCmd {
overwritePipelines = true
}
// register pipeline loading to happen every time a new ES connection is
// established
callback := func(esClient *eslegclient.Connection) error {
return fb.moduleRegistry.LoadPipelines(esClient, overwritePipelines)
}
_, err := elasticsearch.RegisterConnectCallback(callback)
return err
}
// Run allows the beater to be run as a beat.
func (fb *Filebeat) Run(b *beat.Beat) error {
var err error
config := fb.config
if !fb.moduleRegistry.Empty() {
err = fb.loadModulesPipelines(b)
if err != nil {
return err
}
}
waitFinished := newSignalWait()
waitEvents := newSignalWait()
// count active events for waiting on shutdown
var reg *monitoring.Registry
if b.Info.Monitoring.Namespace != nil {
reg = b.Info.Monitoring.Namespace.GetRegistry().GetRegistry("stats")
if reg == nil {
reg = b.Info.Monitoring.Namespace.GetRegistry().NewRegistry("stats")
}
}
wgEvents := &eventCounter{
count: monitoring.NewInt(reg, "filebeat.events.active"), // Gauge
added: monitoring.NewUint(reg, "filebeat.events.added"),
done: monitoring.NewUint(reg, "filebeat.events.done"),
}
finishedLogger := newFinishedLogger(wgEvents)
registryMigrator := registrar.NewMigrator(config.Registry, fb.logger)
if err := registryMigrator.Run(); err != nil {
fb.logger.Errorf("Failed to migrate registry file: %+v", err)
return err
}
// Use context, like normal people do, hooking up to the beat.done channel
ctx, cn := context.WithCancel(context.Background())
go func() {
<-fb.done
cn()
}()
stateStore, err := openStateStore(ctx, b.Info, fb.logger.Named("filebeat"), config.Registry)
if err != nil {
fb.logger.Errorf("Failed to open state store: %+v", err)
return err
}
defer stateStore.Close()
// If notifier is set, configure the listener for output configuration
// The notifier passes the elasticsearch output configuration down to the Elasticsearch backed state storage
// in order to allow it fully configure
if stateStore.notifier != nil {
b.OutputConfigReloader = reload.ReloadableFunc(func(r *reload.ConfigWithMeta) error {
outCfg := conf.Namespace{}
if err := r.Config.Unpack(&outCfg); err != nil || outCfg.Name() != "elasticsearch" {
fb.logger.Errorf("Failed to unpack the output config: %v", err)
return nil
}
// Create a new config with the output configuration. Since r.Config is a pointer, a copy is required to
// avoid concurrent map read and write.
// See https://github.com/elastic/beats/issues/42815
configCopy, err := conf.NewConfigFrom(outCfg.Config())
if err != nil {
fb.logger.Errorf("Failed to create a new config from the output config: %v", err)
return nil
}
stateStore.notifier.Notify(configCopy)
return nil
})
}
err = filestream.ValidateInputIDs(config.Inputs, fb.logger.Named("input.filestream"))
if err != nil {
fb.logger.Errorf("invalid filestream configuration: %+v", err)
return err
}
// Setup registrar to persist state
registrar, err := registrar.New(stateStore, finishedLogger, config.Registry.FlushTimeout)
if err != nil {
fb.logger.Errorf("Could not init registrar: %v", err)
return err
}
// Make sure all events that were published in
registrarChannel := newRegistrarLogger(registrar)
// setup event counting for startup and a global common ACKer, such that all events will be
// routed to the reigstrar after they've been ACKed.
// Events with Private==nil or the type of private != file.State are directly
// forwarded to `finishedLogger`. Events from the `logs` input will first be forwarded
// to the registrar via `registrarChannel`, which finally forwards the events to finishedLogger as well.
// The finishedLogger decrements the counters in wgEvents after all events have been securely processed
// by the registry.
fb.pipeline = withPipelineEventCounter(b.Publisher, wgEvents)
fb.pipeline = pipetool.WithACKer(fb.pipeline, eventACKer(finishedLogger, registrarChannel))
// Filebeat by default required infinite retry. Let's configure this for all
// inputs by default. Inputs (and InputController) can overwrite the sending
// guarantees explicitly when connecting with the pipeline.
fb.pipeline = pipetool.WithDefaultGuarantees(fb.pipeline, beat.GuaranteedSend)
outDone := make(chan struct{}) // outDone closes down all active pipeline connections
pipelineConnector := channel.NewOutletFactory(outDone).Create
inputsLogger := fb.logger.Named("input")
v2Inputs := fb.pluginFactory(b.Info, inputsLogger, stateStore)
v2InputLoader, err := v2.NewLoader(inputsLogger, v2Inputs, "type", cfg.DefaultType)
if err != nil {
panic(err) // loader detected invalid state.
}
var inputTaskGroup unison.TaskGroup
defer func() {
_ = inputTaskGroup.Stop()
}()
// Store needs to be fully configured at this point
if err := v2InputLoader.Init(&inputTaskGroup); err != nil {
fb.logger.Errorf("Failed to initialize the input managers: %v", err)
return err
}
inputLoader := channel.RunnerFactoryWithCommonInputSettings(b.Info, compat.Combine(
compat.RunnerFactory(inputsLogger, b.Info, v2InputLoader),
input.NewRunnerFactory(pipelineConnector, registrar, fb.done, fb.logger),
))
// Create a ES connection factory for dynamic modules pipeline loading
var pipelineLoaderFactory fileset.PipelineLoaderFactory
// The pipelineFactory needs a context to control the connections to ES,
// when the pipelineFactory/ESClient are not needed any more the context
// must be cancelled. This pipeline factory will be used by the moduleLoader
// that is run by a crawler, whenever this crawler is stopped we also cancel
// the context.
pipelineFactoryCtx, cancelPipelineFactoryCtx := context.WithCancel(context.Background())
defer cancelPipelineFactoryCtx()
if b.Config.Output.Name() == "elasticsearch" {
pipelineLoaderFactory = newPipelineLoaderFactory(pipelineFactoryCtx, b.Config.Output.Config())
} else {
fb.logger.Warn(pipelinesWarning)
}
moduleLoader := fileset.NewFactory(inputLoader, b.Info, pipelineLoaderFactory, config.OverwritePipelines)
crawler, err := newCrawler(inputLoader, moduleLoader, config.Inputs, fb.done, *once, fb.logger)
if err != nil {
fb.logger.Errorf("Could not init crawler: %v", err)
return err
}
// The order of starting and stopping is important. Stopping is inverted to the starting order.
// The current order is: registrar, publisher, spooler, crawler
// That means, crawler is stopped first.
// Start the registrar
err = registrar.Start()
if err != nil {
return fmt.Errorf("Could not start registrar: %w", err) //nolint:staticcheck //Keep old behavior
}
// Stopping registrar will write last state
defer registrar.Stop()
// Stopping publisher (might potentially drop items)
defer func() {
// Closes first the registrar logger to make sure not more events arrive at the registrar
// registrarChannel must be closed first to potentially unblock (pretty unlikely) the publisher
registrarChannel.Close()
close(outDone) // finally close all active connections to publisher pipeline
}()
// Wait for all events to be processed or timeout
defer waitEvents.Wait()
if config.OverwritePipelines {
fb.logger.Debug("modules", "Existing Ingest pipelines will be updated")
}
err = crawler.Start(fb.pipeline, config.ConfigInput, config.ConfigModules)
if err != nil {
crawler.Stop()
cancelPipelineFactoryCtx()
return fmt.Errorf("Failed to start crawler: %w", err) //nolint:staticcheck //Keep old behavior
}
// If run once, add crawler completion check as alternative to done signal
if *once {
runOnce := func() {
fb.logger.Info("Running filebeat once. Waiting for completion ...")
crawler.WaitForCompletion()
fb.logger.Info("All data collection completed. Shutting down.")
}
waitFinished.Add(runOnce)
}
// Register reloadable list of inputs and modules
inputs := cfgfile.NewRunnerList(management.DebugK, inputLoader, fb.pipeline, fb.logger)
b.Registry.MustRegisterInput(inputs)
modules := cfgfile.NewRunnerList(management.DebugK, moduleLoader, fb.pipeline, fb.logger)
var adiscover *autodiscover.Autodiscover
if fb.config.Autodiscover != nil {
adiscover, err = autodiscover.NewAutodiscover(
"filebeat",
fb.pipeline,
cfgfile.MultiplexedRunnerFactory(
cfgfile.MatchHasField("module", moduleLoader),
cfgfile.MatchDefault(inputLoader),
),
autodiscover.QueryConfig(),
config.Autodiscover,
b.Keystore,
fb.logger,
)
if err != nil {
return err
}
}
adiscover.Start()
// We start the manager when all the subsystem are initialized and ready to received events.
if err := b.Manager.Start(); err != nil {
return err
}
// Add done channel to wait for shutdown signal
waitFinished.AddChan(fb.done)
waitFinished.Wait()
// Stop reloadable lists, autodiscover -> Stop crawler -> stop inputs -> stop harvesters
// Note: waiting for crawlers to stop here in order to install wgEvents.Wait
// after all events have been enqueued for publishing. Otherwise wgEvents.Wait
// or publisher might panic due to concurrent updates.
inputs.Stop()
modules.Stop()
adiscover.Stop()
crawler.Stop()
cancelPipelineFactoryCtx()
timeout := fb.config.ShutdownTimeout
// Checks if on shutdown it should wait for all events to be published
waitPublished := fb.config.ShutdownTimeout > 0 || *once
if waitPublished {
// Wait for registrar to finish writing registry
waitEvents.Add(withLog(wgEvents.Wait,
"Continue shutdown: All enqueued events being published."))
// Wait for either timeout or all events having been ACKed by outputs.
if fb.config.ShutdownTimeout > 0 {
fb.logger.Info("Shutdown output timer started. Waiting for max %v.", timeout)
waitEvents.Add(withLog(waitDuration(timeout),
"Continue shutdown: Time out waiting for events being published."))
} else {
waitEvents.AddChan(fb.done)
}
}
// Stop the manager and stop the connection to any dependent services.
// The Manager started to have a working implementation when
// https://github.com/elastic/beats/pull/34416 was merged.
// This is intended to enable TLS certificates reload on a long
// running Beat.
//
// However calling b.Manager.Stop() here messes up the behavior of the
// --once flag because it makes Filebeat exit early.
// So if --once is passed, we don't call b.Manager.Stop().
if !*once {
b.Manager.Stop()
}
return nil
}
// Stop is called on exit to stop the crawling, spooling and registration processes.
func (fb *Filebeat) Stop() {
fb.logger.Info("Stopping filebeat")
// Stop Filebeat
fb.stopOnce.Do(func() { close(fb.done) })
}
// Create a new pipeline loader (es client) factory
func newPipelineLoaderFactory(ctx context.Context, esConfig *conf.C) fileset.PipelineLoaderFactory {
pipelineLoaderFactory := func() (fileset.PipelineLoader, error) {
esClient, err := eslegclient.NewConnectedClient(ctx, esConfig, "Filebeat")
if err != nil {
return nil, fmt.Errorf("Error creating Elasticsearch client: %w", err) //nolint:staticcheck //Keep old behavior
}
return esClient, nil
}
return pipelineLoaderFactory
}
// fetches all the defined input configuration available at Filebeat startup including external files.
func fetchInputConfiguration(config *cfg.Config) (inputs []*conf.C, err error) {
if len(config.Inputs) == 0 {
inputs = []*conf.C{}
} else {
inputs = config.Inputs
}
// reading external input configuration if defined
var dynamicInputCfg cfgfile.DynamicConfig
if config.ConfigInput != nil {
err = config.ConfigInput.Unpack(&dynamicInputCfg)
if err != nil {
return nil, fmt.Errorf("failed to unpack the dynamic input configuration: %w", err)
}
}
if dynamicInputCfg.Path == "" {
return inputs, nil
}
cfgPaths, err := filepath.Glob(dynamicInputCfg.Path)
if err != nil {
return nil, fmt.Errorf("failed to resolve external input configuration paths: %w", err)
}
if len(cfgPaths) == 0 {
return inputs, nil
}
// making a copy so we can safely extend the slice
inputs = make([]*conf.C, len(config.Inputs))
copy(inputs, config.Inputs)
for _, p := range cfgPaths {
externalInputs, err := cfgfile.LoadList(p)
if err != nil {
return nil, fmt.Errorf("failed to load external input configuration: %w", err)
}
inputs = append(inputs, externalInputs...)
}
return inputs, nil
}