libbeat/cmd/instance/beat.go (1,314 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package instance import ( "context" "encoding/json" "errors" "flag" "fmt" "io" "net" "os" "os/user" "runtime" "runtime/debug" "strconv" "strings" "sync" "time" "github.com/gofrs/uuid/v5" "go.opentelemetry.io/collector/consumer" "go.uber.org/zap" "go.uber.org/zap/zapcore" "github.com/elastic/beats/v7/libbeat/api" "github.com/elastic/beats/v7/libbeat/asset" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/cfgfile" "github.com/elastic/beats/v7/libbeat/cloudid" "github.com/elastic/beats/v7/libbeat/cmd/instance/locks" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/fleetmode" "github.com/elastic/beats/v7/libbeat/common/reload" "github.com/elastic/beats/v7/libbeat/common/seccomp" "github.com/elastic/beats/v7/libbeat/dashboards" "github.com/elastic/beats/v7/libbeat/esleg/eslegclient" "github.com/elastic/beats/v7/libbeat/features" "github.com/elastic/beats/v7/libbeat/idxmgmt" "github.com/elastic/beats/v7/libbeat/idxmgmt/lifecycle" "github.com/elastic/beats/v7/libbeat/instrumentation" "github.com/elastic/beats/v7/libbeat/kibana" "github.com/elastic/beats/v7/libbeat/management" "github.com/elastic/beats/v7/libbeat/monitoring/report" "github.com/elastic/beats/v7/libbeat/monitoring/report/log" "github.com/elastic/beats/v7/libbeat/outputs" "github.com/elastic/beats/v7/libbeat/outputs/elasticsearch" "github.com/elastic/beats/v7/libbeat/plugin" "github.com/elastic/beats/v7/libbeat/pprof" "github.com/elastic/beats/v7/libbeat/publisher/pipeline" "github.com/elastic/beats/v7/libbeat/publisher/processing" "github.com/elastic/beats/v7/libbeat/publisher/queue/diskqueue" "github.com/elastic/beats/v7/libbeat/version" "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/file" "github.com/elastic/elastic-agent-libs/filewatcher" "github.com/elastic/elastic-agent-libs/keystore" kbn "github.com/elastic/elastic-agent-libs/kibana" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/logp/configure" "github.com/elastic/elastic-agent-libs/mapstr" "github.com/elastic/elastic-agent-libs/monitoring" "github.com/elastic/elastic-agent-libs/monitoring/report/buffer" "github.com/elastic/elastic-agent-libs/paths" svc "github.com/elastic/elastic-agent-libs/service" "github.com/elastic/elastic-agent-libs/transport/tlscommon" libversion "github.com/elastic/elastic-agent-libs/version" "github.com/elastic/elastic-agent-system-metrics/metric/system/host" metricreport "github.com/elastic/elastic-agent-system-metrics/report" "github.com/elastic/go-sysinfo" "github.com/elastic/go-sysinfo/types" "github.com/elastic/go-ucfg" ) // Beat provides the runnable and configurable instance of a beat. type Beat struct { beat.Beat Config beatConfig RawConfig *config.C // Raw config that can be unpacked to get Beat specific config data. IdxSupporter idxmgmt.Supporter keystore keystore.Keystore processors processing.Supporter InputQueueSize int // Size of the producer queue used by most queues. // shouldReexec is a flag to indicate the Beat should restart shouldReexec bool } type beatConfig struct { beat.BeatConfig `config:",inline"` // instance internal configs // beat top-level settings Name string `config:"name"` MaxProcs int `config:"max_procs"` GCPercent int `config:"gc_percent"` Seccomp *config.C `config:"seccomp"` Features *config.C `config:"features"` // beat internal components configurations HTTP *config.C `config:"http"` HTTPPprof *pprof.Config `config:"http.pprof"` BufferConfig *config.C `config:"http.buffer"` Path paths.Path `config:"path"` Logging *config.C `config:"logging"` EventLogging *config.C `config:"logging.event_data"` MetricLogging *config.C `config:"logging.metrics"` Keystore *config.C `config:"keystore"` Instrumentation instrumentation.Config `config:"instrumentation"` // output/publishing related configurations Pipeline pipeline.Config `config:",inline"` // monitoring settings MonitoringBeatConfig monitoring.BeatConfig `config:",inline"` // ILM settings LifecycleConfig lifecycle.RawConfig `config:",inline"` // central management settings Management *config.C `config:"management"` // elastic stack 'setup' configurations Dashboards *config.C `config:"setup.dashboards"` Kibana *config.C `config:"setup.kibana"` // Migration config to migration from 6 to 7 Migration *config.C `config:"migration.6_to_7"` // TimestampPrecision sets the precision of all timestamps in the Beat. TimestampPrecision *config.C `config:"timestamp"` } type certReloadConfig struct { tlscommon.Config `config:",inline" yaml:",inline"` Reload cfgfile.Reload `config:"restart_on_cert_change" yaml:"restart_on_cert_change"` } func (c certReloadConfig) Validate() error { if c.Reload.Period < time.Second { return errors.New("'restart_on_cert_change.period' must be equal or greather than 1s") } if c.Reload.Enabled && runtime.GOOS == "windows" { return errors.New("'restart_on_cert_change' is not supported on Windows") } return nil } func defaultCertReloadConfig() certReloadConfig { return certReloadConfig{ Reload: cfgfile.Reload{ Enabled: false, Period: time.Minute, }, } } // Run initializes and runs a Beater implementation. name is the name of the // Beat (e.g. packetbeat or metricbeat). version is version number of the Beater // implementation. bt is the `Creator` callback for creating a new beater // instance. // XXX Move this as a *Beat method? func Run(settings Settings, bt beat.Creator) error { return handleError(func() error { defer func() { if r := recover(); r != nil { logp.NewLogger(settings.Name).Fatalw("Failed due to panic.", "panic", r, zap.Stack("stack")) } }() b, err := NewInitializedBeat(settings) if err != nil { return err } return b.launch(settings, bt) }()) } // NewInitializedBeat creates a new beat where all information and initialization is derived from settings func NewInitializedBeat(settings Settings) (*Beat, error) { b, err := NewBeat(settings.Name, settings.IndexPrefix, settings.Version, settings.ElasticLicensed, settings.Initialize) if err != nil { return nil, err } if err := b.InitWithSettings(settings); err != nil { return nil, err } return b, nil } // NewBeat creates a new beat instance func NewBeat(name, indexPrefix, v string, elasticLicensed bool, initFuncs []func()) (*Beat, error) { // call all initialization functions for _, f := range initFuncs { f() } if v == "" { v = version.GetDefaultVersion() } if indexPrefix == "" { indexPrefix = name } hostname, err := os.Hostname() if err != nil { return nil, err } fields, err := asset.GetFields(name) if err != nil { return nil, err } id, err := uuid.NewV4() if err != nil { return nil, err } b := beat.Beat{ Info: beat.Info{ Beat: name, ElasticLicensed: elasticLicensed, IndexPrefix: indexPrefix, Version: v, Name: hostname, Hostname: hostname, ID: id, FirstStart: time.Now(), StartTime: time.Now(), EphemeralID: metricreport.EphemeralID(), FIPSDistribution: version.FIPSDistribution, }, Fields: fields, Registry: reload.NewRegistry(), } return &Beat{Beat: b}, nil } // NewBeatReceiver creates a Beat that will be used in the context of an otel receiver func NewBeatReceiver(settings Settings, receiverConfig map[string]interface{}, useDefaultProcessors bool, consumer consumer.Logs, core zapcore.Core) (*Beat, error) { b, err := NewBeat(settings.Name, settings.IndexPrefix, settings.Version, settings.ElasticLicensed, settings.Initialize) if err != nil { return nil, err } b.Info.LogConsumer = consumer // begin code similar to configure if err = plugin.Initialize(); err != nil { return nil, fmt.Errorf("error initializing plugins: %w", err) } b.InputQueueSize = settings.InputQueueSize cfOpts := []ucfg.Option{ ucfg.PathSep("."), ucfg.ResolveEnv, ucfg.VarExp, } tmp, err := ucfg.NewFrom(receiverConfig, cfOpts...) if err != nil { return nil, fmt.Errorf("error converting receiver config to ucfg: %w", err) } cfg := (*config.C)(tmp) if err := initPaths(cfg); err != nil { return nil, fmt.Errorf("error initializing paths: %w", err) } // We have to initialize the keystore before any unpack or merging the cloud // options. store, err := LoadKeystore(cfg, b.Info.Beat) if err != nil { return nil, fmt.Errorf("could not initialize the keystore: %w", err) } if settings.DisableConfigResolver { config.OverwriteConfigOpts(obfuscateConfigOpts()) } else if store != nil { // TODO: Allow the options to be more flexible for dynamic changes // note that if the store is nil it should be excluded as an option config.OverwriteConfigOpts(configOptsWithKeystore(store)) } b.Info.Monitoring.Namespace = monitoring.GetNamespace(b.Info.Beat + "-" + b.Info.ID.String()) b.Info.Monitoring.SetupRegistries() b.keystore = store b.Beat.Keystore = store err = cloudid.OverwriteSettings(cfg) if err != nil { return nil, fmt.Errorf("error overwriting cloudid settings: %w", err) } b.RawConfig = cfg err = cfg.Unpack(&b.Config) if err != nil { return nil, fmt.Errorf("error unpacking config data: %w", err) } logpConfig := logp.Config{} logpConfig.AddCaller = true logpConfig.Beat = b.Info.Name logpConfig.Files.MaxSize = 1 if b.Config.Logging == nil { b.Config.Logging = config.NewConfig() } if err := b.Config.Logging.Unpack(&logpConfig); err != nil { return nil, fmt.Errorf("error unpacking beats logging config: %w\n%v", err, b.Config.Logging) } b.Info.Logger, err = logp.ConfigureWithCoreLocal(logpConfig, core) if err != nil { return nil, fmt.Errorf("error configuring beats logp: %w", err) } // extracting it here for ease of use logger := b.Info.Logger instrumentation, err := instrumentation.New(cfg, b.Info.Beat, b.Info.Version, logger) if err != nil { return nil, fmt.Errorf("error setting up instrumentation: %w", err) } b.Instrumentation = instrumentation if err := promoteOutputQueueSettings(b); err != nil { return nil, fmt.Errorf("could not promote output queue settings: %w", err) } if err := features.UpdateFromConfig(b.RawConfig); err != nil { return nil, fmt.Errorf("could not parse features: %w", err) } b.RegisterHostname(features.FQDN()) b.Beat.Config = &b.Config.BeatConfig if name := b.Config.Name; name != "" { b.Info.Name = name } if err := common.SetTimestampPrecision(b.Config.TimestampPrecision); err != nil { return nil, fmt.Errorf("error setting timestamp precision: %w", err) } // log paths values to help with troubleshooting logger.Infof("%s", paths.Paths.String()) metaPath := paths.Resolve(paths.Data, "meta.json") err = b.loadMeta(metaPath) if err != nil { return nil, fmt.Errorf("error loading meta data: %w", err) } logger.Infof("Beat ID: %v", b.Info.ID) // Try to get the host's FQDN and set it. h, err := sysinfo.Host() if err != nil { return nil, fmt.Errorf("failed to get host information: %w", err) } fqdnLookupCtx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) defer cancel() fqdn, err := h.FQDNWithContext(fqdnLookupCtx) if err != nil { // FQDN lookup is "best effort". We log the error, fallback to // the OS-reported hostname, and move on. logger.Warnf("unable to lookup FQDN: %s, using hostname = %s as FQDN", err.Error(), b.Info.Hostname) b.Info.FQDN = b.Info.Hostname } else { b.Info.FQDN = fqdn } // initialize config manager m, err := management.NewManager(b.Config.Management, b.Registry) if err != nil { return nil, fmt.Errorf("error creating new manager: %w", err) } b.Manager = m if b.Manager.AgentInfo().Version != "" { // During the manager initialization the client to connect to the agent is // also initialized. That makes the beat to read information sent by the // agent, which includes the AgentInfo with the agent's package version. // Components running under agent should report the agent's package version // as their own version. // In order to do so b.Info.Version needs to be set to the version the agent // sent. As this Beat instance is initialized much before the package // version is received, it's overridden here. So far it's early enough for // the whole beat to report the right version. b.Info.Version = b.Manager.AgentInfo().Version version.SetPackageVersion(b.Info.Version) } // build the user-agent string to be used by the outputs b.GenerateUserAgent() if err := b.Manager.CheckRawConfig(b.RawConfig); err != nil { return nil, fmt.Errorf("error checking raw config: %w", err) } b.Beat.BeatConfig, err = b.BeatConfig() if err != nil { return nil, fmt.Errorf("error setting BeatConfig: %w", err) } imFactory := settings.IndexManagement if imFactory == nil { imFactory = idxmgmt.MakeDefaultSupport(settings.ILM, logger) } b.IdxSupporter, err = imFactory(logger, b.Info, b.RawConfig) if err != nil { return nil, fmt.Errorf("error setting index supporter: %w", err) } b.Info.UseDefaultProcessors = useDefaultProcessors processingFactory := settings.Processing if processingFactory == nil { processingFactory = processing.MakeDefaultBeatSupport(true) } b.processors, err = processingFactory(b.Info, logger.Named("processors"), b.RawConfig) if err != nil { return nil, fmt.Errorf("error creating processors: %w", err) } // This should be replaced with static config for otel consumer // but need to figure out if we want the Queue settings from here. outputEnabled := b.Config.Output.IsSet() && b.Config.Output.Config().Enabled() if !outputEnabled { if b.Manager.Enabled() { logger.Info("Output is configured through Central Management") } else { return nil, fmt.Errorf("no outputs are defined, please define one under the output section") } } namespaceReg := b.Info.Monitoring.Namespace.GetRegistry() reg := b.Info.Monitoring.StatsRegistry.GetRegistry("libbeat") if reg == nil { reg = b.Info.Monitoring.StatsRegistry.NewRegistry("libbeat") } tel := namespaceReg.GetRegistry("state") if tel == nil { tel = namespaceReg.NewRegistry("state") } monitors := pipeline.Monitors{ Metrics: reg, Telemetry: tel, Logger: logger.Named("publisher"), Tracer: b.Instrumentation.Tracer(), } outputFactory := b.makeOutputFactory(b.Config.Output) pipelineSettings := pipeline.Settings{ Processors: b.processors, InputQueueSize: b.InputQueueSize, } publisher, err := pipeline.LoadWithSettings(b.Info, monitors, b.Config.Pipeline, outputFactory, pipelineSettings) if err != nil { return nil, fmt.Errorf("error initializing publisher: %w", err) } b.Registry.MustRegisterOutput(b.makeOutputReloader(publisher.OutputReloader())) b.Publisher = publisher return b, nil } // InitWithSettings does initialization of things common to all actions (read confs, flags) func (b *Beat) InitWithSettings(settings Settings) error { err := b.handleFlags() if err != nil { return err } if err := plugin.Initialize(); err != nil { return err } if err := b.configure(settings); err != nil { return err } return nil } // Init does initialization of things common to all actions (read confs, flags) // // Deprecated: use InitWithSettings func (b *Beat) Init() error { return b.InitWithSettings(Settings{}) } // BeatConfig returns config section for this beat func (b *Beat) BeatConfig() (*config.C, error) { configName := strings.ToLower(b.Info.Beat) if b.RawConfig.HasField(configName) { sub, err := b.RawConfig.Child(configName, -1) if err != nil { return nil, err } return sub, nil } return config.NewConfig(), nil } // Keystore return the configured keystore for this beat func (b *Beat) Keystore() keystore.Keystore { return b.keystore } // create and return the beater, this method also initializes all needed items, // including template registering, publisher, xpack monitoring func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) { sub, err := b.BeatConfig() if err != nil { return nil, err } log := b.Info.Logger.Named("beat") log.Infof("Setup Beat: %s; Version: %s (FIPS-distribution: %v)", b.Info.Beat, b.Info.Version, b.Info.FIPSDistribution) b.logSystemInfo(log) err = b.registerESVersionCheckCallback() if err != nil { return nil, err } err = b.registerESIndexManagement() if err != nil { return nil, err } b.registerClusterUUIDFetching() reg := monitoring.Default.GetRegistry("libbeat") if reg == nil { reg = monitoring.Default.NewRegistry("libbeat") } err = metricreport.SetupMetrics(b.Info.Logger.Named("metrics"), b.Info.Beat, version.GetDefaultVersion()) if err != nil { return nil, err } // Report central management state mgmt := b.Info.Monitoring.StateRegistry.NewRegistry("management") monitoring.NewBool(mgmt, "enabled").Set(b.Manager.Enabled()) log.Debug("Initializing output plugins") outputEnabled := b.Config.Output.IsSet() && b.Config.Output.Config().Enabled() if !outputEnabled { if b.Manager.Enabled() { b.Info.Logger.Info("Output is configured through Central Management") } else { msg := "no outputs are defined, please define one under the output section" b.Info.Logger.Info("%s", msg) return nil, errors.New(msg) } } var publisher *pipeline.Pipeline monitors := pipeline.Monitors{ Metrics: reg, Telemetry: b.Info.Monitoring.StateRegistry, Logger: b.Info.Logger.Named("publisher"), Tracer: b.Instrumentation.Tracer(), } outputFactory := b.makeOutputFactory(b.Config.Output) settings := pipeline.Settings{ // Since now publisher is closed on Stop, we want to give some // time to ack any pending events by default to avoid // changing on stop behavior too much. WaitClose: time.Second, Processors: b.processors, InputQueueSize: b.InputQueueSize, } publisher, err = pipeline.LoadWithSettings(b.Info, monitors, b.Config.Pipeline, outputFactory, settings) if err != nil { return nil, fmt.Errorf("error initializing publisher: %w", err) } b.Registry.MustRegisterOutput(b.makeOutputReloader(publisher.OutputReloader())) b.Publisher = publisher beater, err := bt(&b.Beat, sub) if err != nil { return nil, err } return beater, nil } func (b *Beat) launch(settings Settings, bt beat.Creator) error { logger := b.Info.Logger defer func() { _ = logger.Sync() }() defer logger.Infof("%s stopped.", b.Info.Beat) defer func() { if err := b.processors.Close(); err != nil { logger.Warnf("Failed to close global processing: %v", err) } }() // Windows: Mark service as stopped. // After this is run, a Beat service is considered by the OS to be stopped // and another instance of the process can be started. // This must be the first deferred cleanup task (last to execute). defer svc.NotifyTermination() // Try to acquire exclusive lock on data path to prevent another beat instance // sharing same data path. This is disabled under elastic-agent. if !fleetmode.Enabled() { bl := locks.New(b.Info) err := bl.Lock() if err != nil { return err } defer func() { _ = bl.Unlock() }() } else { logger.Info("running under elastic-agent, per-beat lockfiles disabled") } svc.BeforeRun() defer svc.Cleanup() b.RegisterMetrics() // Start the API Server before the Seccomp lock down, we do this so we can create the unix socket // set the appropriate permission on the unix domain file without having to whitelist anything // that would be set at runtime. if b.Config.HTTP.Enabled() { var err error b.API, err = api.NewWithDefaultRoutes(logger, b.Config.HTTP, api.NamespaceLookupFunc()) if err != nil { return fmt.Errorf("could not start the HTTP server for the API: %w", err) } b.API.Start() defer func() { _ = b.API.Stop() }() if b.Config.HTTPPprof.IsEnabled() { pprof.SetRuntimeProfilingParameters(b.Config.HTTPPprof) if err := pprof.HttpAttach(b.Config.HTTPPprof, b.API); err != nil { return fmt.Errorf("failed to attach http handlers for pprof: %w", err) } } } // Do not load seccomp for osquerybeat, it was disabled before V2 in the configuration file // https://github.com/elastic/beats/blob/7cf873fd340172c33f294500ccfec948afd7a47c/x-pack/osquerybeat/osquerybeat.yml#L16 if b.Info.Beat != "osquerybeat" { if err := seccomp.LoadFilter(b.Config.Seccomp); err != nil { return err } } beater, err := b.createBeater(bt) if err != nil { return err } r, err := b.setupMonitoring(settings) if err != nil { return err } if r != nil { defer r.Stop() } if b.Config.MetricLogging == nil || b.Config.MetricLogging.Enabled() { reporter, err := log.MakeReporter(b.Info, b.Config.MetricLogging) if err != nil { return err } defer reporter.Stop() } // only collect into a ring buffer if HTTP, and the ring buffer are explicitly enabled if b.Config.HTTP.Enabled() && monitoring.IsBufferEnabled(b.Config.BufferConfig) { buffReporter, err := buffer.MakeReporter(b.Config.BufferConfig) if err != nil { return err } defer buffReporter.Stop() if err := b.API.AttachHandler("/buffer", buffReporter); err != nil { return err } } ctx, cancel := context.WithCancel(context.Background()) // stopBeat must be idempotent since it will be called both from a signal and by the manager. // Since publisher.Close is not safe to be called more than once this is necessary. var once sync.Once stopBeat := func() { once.Do(func() { b.Instrumentation.Tracer().Close() // If the publisher has a Close() method, call it before stopping the beater. if c, ok := b.Publisher.(io.Closer); ok { c.Close() } beater.Stop() }) } svc.HandleSignals(stopBeat, cancel) // Allow the manager to stop a currently running beats out of bound. b.Manager.SetStopCallback(stopBeat) err = b.loadDashboards(ctx, false) if err != nil { return err } logger.Infof("%s start running.", b.Info.Beat) err = beater.Run(&b.Beat) if b.shouldReexec { if err := b.reexec(); err != nil { return fmt.Errorf("could not restart %s: %w", b.Info.Beat, err) } } return err } // reexec restarts the Beat, it calls the OS-specific implementation. func (b *Beat) reexec() error { return b.doReexec() } // RegisterMetrics registers metrics with the internal monitoring API. This data // is then exposed through the HTTP monitoring endpoint (e.g. /info and /state) // and/or pushed to Elasticsearch through the x-pack monitoring feature. func (b *Beat) RegisterMetrics() { // info monitoring.NewString(b.Info.Monitoring.InfoRegistry, "version").Set(b.Info.Version) monitoring.NewString(b.Info.Monitoring.InfoRegistry, "beat").Set(b.Info.Beat) monitoring.NewString(b.Info.Monitoring.InfoRegistry, "name").Set(b.Info.Name) monitoring.NewString(b.Info.Monitoring.InfoRegistry, "uuid").Set(b.Info.ID.String()) monitoring.NewString(b.Info.Monitoring.InfoRegistry, "ephemeral_id").Set(b.Info.EphemeralID.String()) monitoring.NewString(b.Info.Monitoring.InfoRegistry, "binary_arch").Set(runtime.GOARCH) monitoring.NewString(b.Info.Monitoring.InfoRegistry, "build_commit").Set(version.Commit()) monitoring.NewTimestamp(b.Info.Monitoring.InfoRegistry, "build_time").Set(version.BuildTime()) monitoring.NewBool(b.Info.Monitoring.InfoRegistry, "elastic_licensed").Set(b.Info.ElasticLicensed) // Add user metadata data asynchronously (on Windows the lookup can take up to 60s). go func() { if u, err := user.Current(); err != nil { // This usually happens if the user UID does not exist in /etc/passwd. It might be the case on K8S // if the user set securityContext.runAsUser to an arbitrary value. monitoring.NewString(b.Info.Monitoring.InfoRegistry, "uid").Set(strconv.Itoa(os.Getuid())) monitoring.NewString(b.Info.Monitoring.InfoRegistry, "gid").Set(strconv.Itoa(os.Getgid())) } else { monitoring.NewString(b.Info.Monitoring.InfoRegistry, "username").Set(u.Username) monitoring.NewString(b.Info.Monitoring.InfoRegistry, "uid").Set(u.Uid) monitoring.NewString(b.Info.Monitoring.InfoRegistry, "gid").Set(u.Gid) } }() // state.service serviceRegistry := b.Info.Monitoring.StateRegistry.NewRegistry("service") monitoring.NewString(serviceRegistry, "version").Set(b.Info.Version) monitoring.NewString(serviceRegistry, "name").Set(b.Info.Beat) monitoring.NewString(serviceRegistry, "id").Set(b.Info.ID.String()) // state.beat beatRegistry := b.Info.Monitoring.StateRegistry.NewRegistry("beat") monitoring.NewString(beatRegistry, "name").Set(b.Info.Name) } func (b *Beat) RegisterHostname(useFQDN bool) { hostname := b.Info.FQDNAwareHostname(useFQDN) // info.hostname monitoring.NewString(b.Info.Monitoring.InfoRegistry, "hostname").Set(hostname) // state.host monitoring.NewFunc(b.Info.Monitoring.StateRegistry, "host", host.ReportInfo(hostname), monitoring.Report) } // TestConfig check all settings are ok and the beat can be run func (b *Beat) TestConfig(settings Settings, bt beat.Creator) error { return handleError(func() error { err := b.InitWithSettings(settings) if err != nil { return err } // Create beater to ensure all settings are OK _, err = b.createBeater(bt) if err != nil { return err } fmt.Println("Config OK") //nolint:forbidigo // required to give feedback to user return beat.GracefulExit }()) } // SetupSettings holds settings necessary for beat setup type SetupSettings struct { Dashboard bool Pipeline bool IndexManagement bool // Deprecated: use IndexManagementKey instead Template bool // Deprecated: use IndexManagementKey instead ILMPolicy bool EnableAllFilesets bool ForceEnableModuleFilesets bool } // Setup registers ES index template, kibana dashboards, ml jobs and pipelines. // //nolint:forbidigo // required to give feedback to user func (b *Beat) Setup(settings Settings, bt beat.Creator, setup SetupSettings) error { return handleError(func() error { err := b.InitWithSettings(settings) if err != nil { return err } // Tell the beat that we're in the setup command b.InSetupCmd = true if setup.ForceEnableModuleFilesets { if err := b.Beat.BeatConfig.SetBool("config.modules.force_enable_module_filesets", -1, true); err != nil { return fmt.Errorf("error setting force_enable_module_filesets config option %w", err) } } // Create beater to give it the opportunity to set loading callbacks _, err = b.createBeater(bt) if err != nil { return err } if setup.IndexManagement || setup.Template || setup.ILMPolicy { outCfg := b.Config.Output if !isElasticsearchOutput(outCfg.Name()) { return fmt.Errorf("index management requested but the Elasticsearch output is not configured/enabled") } ctx, cancel := context.WithCancel(context.Background()) defer cancel() esClient, err := eslegclient.NewConnectedClient(ctx, outCfg.Config(), b.Info.Beat) if err != nil { return err } // other components know to skip ILM setup under serverless, this logic block just helps us print an error message // in instances where ILM has been explicitly enabled var ilmCfg struct { Ilm *config.C `config:"setup.ilm"` } err = b.RawConfig.Unpack(&ilmCfg) if err != nil { return fmt.Errorf("error unpacking ILM config: %w", err) } if ilmCfg.Ilm.Enabled() && esClient.IsServerless() { fmt.Println("WARNING: ILM is not supported in Serverless projects") } loadTemplate, loadILM := idxmgmt.LoadModeUnset, idxmgmt.LoadModeUnset if setup.IndexManagement || setup.Template { loadTemplate = idxmgmt.LoadModeOverwrite } if setup.IndexManagement || setup.ILMPolicy { loadILM = idxmgmt.LoadModeEnabled } mgmtHandler, err := idxmgmt.NewESClientHandler(esClient, b.Info, b.Config.LifecycleConfig) if err != nil { return fmt.Errorf("error creating index management handler: %w", err) } m := b.IdxSupporter.Manager(mgmtHandler, idxmgmt.BeatsAssets(b.Fields)) if ok, warn := m.VerifySetup(loadTemplate, loadILM); !ok { fmt.Println(warn) } if err = m.Setup(loadTemplate, loadILM); err != nil { return err } fmt.Println("Index setup finished.") } if setup.Dashboard && settings.HasDashboards { fmt.Println("Loading dashboards (Kibana must be running and reachable)") err = b.loadDashboards(context.Background(), true) if err != nil { var notFoundErr *dashboards.ErrNotFound if errors.As(err, &notFoundErr) { fmt.Printf("Skipping loading dashboards, %+v\n", err) } else { return err } } else { fmt.Println("Loaded dashboards") } } if setup.Pipeline && b.OverwritePipelinesCallback != nil { if setup.EnableAllFilesets { if err := b.Beat.BeatConfig.SetBool("config.modules.enable_all_filesets", -1, true); err != nil { return fmt.Errorf("error setting enable_all_filesets config option %w", err) } } esConfig := b.Config.Output.Config() err = b.OverwritePipelinesCallback(esConfig) if err != nil { return err } fmt.Println("Loaded Ingest pipelines") } return nil }()) } // handleFlags converts -flag to --flags, parses the command line // flags, and it invokes the HandleFlags callback if implemented by // the Beat. func (b *Beat) handleFlags() error { flag.Parse() return cfgfile.HandleFlags() } // config reads the configuration file from disk, parses the common options // defined in BeatConfig, initializes logging, and set GOMAXPROCS if defined // in the config. Lastly it invokes the Config method implemented by the beat. func (b *Beat) configure(settings Settings) error { var err error b.InputQueueSize = settings.InputQueueSize cfg, err := cfgfile.Load("", settings.ConfigOverrides) if err != nil { return fmt.Errorf("error loading config file: %w", err) } b.Info.Monitoring.SetupRegistries() if err := initPaths(cfg); err != nil { return err } // We have to initialize the keystore before any unpack or merging the cloud // options. store, err := LoadKeystore(cfg, b.Info.Beat) if err != nil { return fmt.Errorf("could not initialize the keystore: %w", err) } if settings.DisableConfigResolver { config.OverwriteConfigOpts(obfuscateConfigOpts()) } else if store != nil { // TODO: Allow the options to be more flexible for dynamic changes // note that if the store is nil it should be excluded as an option config.OverwriteConfigOpts(configOptsWithKeystore(store)) } b.keystore = store b.Beat.Keystore = store err = cloudid.OverwriteSettings(cfg) if err != nil { return err } b.RawConfig = cfg err = cfg.Unpack(&b.Config) if err != nil { return fmt.Errorf("error unpacking config data: %w", err) } b.Info.Logger, err = configure.LoggingWithTypedOutputsLocal(b.Info.Beat, b.Config.Logging, b.Config.EventLogging, logp.TypeKey, logp.EventType) if err != nil { return fmt.Errorf("error initializing logging: %w", err) } // extracting here for ease of use logger := b.Info.Logger if err := promoteOutputQueueSettings(b); err != nil { return fmt.Errorf("could not promote output queue settings: %w", err) } if err := features.UpdateFromConfig(b.RawConfig); err != nil { return fmt.Errorf("could not parse features: %w", err) } b.RegisterHostname(features.FQDN()) b.Beat.Config = &b.Config.BeatConfig if name := b.Config.Name; name != "" { b.Info.Name = name } if err := common.SetTimestampPrecision(b.Config.TimestampPrecision); err != nil { return fmt.Errorf("error setting timestamp precision: %w", err) } instrumentation, err := instrumentation.New(cfg, b.Info.Beat, b.Info.Version, b.Info.Logger) if err != nil { return err } b.Instrumentation = instrumentation // log paths values to help with troubleshooting logger.Infof("%s", paths.Paths.String()) metaPath := paths.Resolve(paths.Data, "meta.json") err = b.loadMeta(metaPath) if err != nil { return err } logger.Infof("Beat ID: %v", b.Info.ID) // Try to get the host's FQDN and set it. h, err := sysinfo.Host() if err != nil { return fmt.Errorf("failed to get host information: %w", err) } fqdnLookupCtx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) defer cancel() fqdn, err := h.FQDNWithContext(fqdnLookupCtx) if err != nil { // FQDN lookup is "best effort". We log the error, fallback to // the OS-reported hostname, and move on. logger.Warnf("unable to lookup FQDN: %s, using hostname = %s as FQDN", err.Error(), b.Info.Hostname) b.Info.FQDN = b.Info.Hostname } else { b.Info.FQDN = fqdn } // initialize config manager m, err := management.NewManager(b.Config.Management, b.Registry) if err != nil { return err } b.Manager = m if b.Manager.AgentInfo().Version != "" { // During the manager initialization the client to connect to the agent is // also initialized. That makes the beat to read information sent by the // agent, which includes the AgentInfo with the agent's package version. // Components running under agent should report the agent's package version // as their own version. // In order to do so b.Info.Version needs to be set to the version the agent // sent. As this Beat instance is initialized much before the package // version is received, it's overridden here. So far it's early enough for // the whole beat to report the right version. b.Info.Version = b.Manager.AgentInfo().Version version.SetPackageVersion(b.Info.Version) } // build the user-agent string to be used by the outputs b.GenerateUserAgent() if err := b.Manager.CheckRawConfig(b.RawConfig); err != nil { return err } if maxProcs := b.Config.MaxProcs; maxProcs > 0 { logger.Infof("Set max procs limit: %v", maxProcs) runtime.GOMAXPROCS(maxProcs) } if gcPercent := b.Config.GCPercent; gcPercent > 0 { logger.Infof("Set gc percentage to: %v", gcPercent) debug.SetGCPercent(gcPercent) } b.Info.Monitoring.Namespace = monitoring.GetNamespace("dataset") b.Beat.BeatConfig, err = b.BeatConfig() if err != nil { return err } imFactory := settings.IndexManagement if imFactory == nil { imFactory = idxmgmt.MakeDefaultSupport(settings.ILM, logger) } b.IdxSupporter, err = imFactory(logger, b.Info, b.RawConfig) if err != nil { return err } processingFactory := settings.Processing if processingFactory == nil { processingFactory = processing.MakeDefaultBeatSupport(true) } b.processors, err = processingFactory(b.Info, logger.Named("processors"), b.RawConfig) b.Manager.RegisterDiagnosticHook("global processors", "a list of currently configured global beat processors", "global_processors.txt", "text/plain", b.agentDiagnosticHook) b.Manager.RegisterDiagnosticHook("beat_metrics", "Metrics from the default monitoring namespace and expvar.", "beat_metrics.json", "application/json", func() []byte { m := monitoring.CollectStructSnapshot(monitoring.Default, monitoring.Full, true) data, err := json.MarshalIndent(m, "", " ") if err != nil { logger.Warnw("Failed to collect beat metric snapshot for Agent diagnostics.", "error", err) return []byte(err.Error()) } return data }) return err } // agentDiagnosticHook is the callback function sent to the agent manager RegisterDiagnosticHook function // right now, this only returns information on the global processors; however, in the future, we might find it useful // to expand this to other components of the beat state. // To anyone refactoring: be careful to make sure the callback is registered after the global processors are initialized func (b *Beat) agentDiagnosticHook() []byte { list := b.processors.Processors() var debugBytes []byte for _, proc := range list { debugBytes = append(debugBytes, []byte(proc+"\n")...) } return debugBytes } func (b *Beat) loadMeta(metaPath string) error { type meta struct { UUID uuid.UUID `json:"uuid"` FirstStart time.Time `json:"first_start"` } b.Info.Logger.Named("beat").Debugf("Beat metadata path: %v", metaPath) f, err := openRegular(metaPath) if err != nil && !os.IsNotExist(err) { return fmt.Errorf("meta file failed to open: %w", err) } if err == nil { m := meta{} if err := json.NewDecoder(f).Decode(&m); err != nil && err != io.EOF { f.Close() return fmt.Errorf("Beat meta file reading error: %w", err) } f.Close() if !m.FirstStart.IsZero() { b.Info.FirstStart = m.FirstStart } valid := m.UUID != uuid.Nil if valid { b.Info.ID = m.UUID } if valid && !m.FirstStart.IsZero() { return nil } } // file does not exist or ID is invalid or first start time is not defined, let's create a new one // write temporary file first tempFile := metaPath + ".new" f, err = os.OpenFile(tempFile, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0o600) if err != nil { return fmt.Errorf("failed to create Beat meta file: %w", err) } encodeErr := json.NewEncoder(f).Encode(meta{UUID: b.Info.ID, FirstStart: b.Info.FirstStart}) err = f.Sync() if err != nil { return fmt.Errorf("Beat meta file failed to write: %w", err) } err = f.Close() if err != nil { return fmt.Errorf("Beat meta file failed to write: %w", err) } if encodeErr != nil { return fmt.Errorf("Beat meta file failed to write: %w", encodeErr) } // move temporary file into final location err = file.SafeFileRotate(metaPath, tempFile) return err } func openRegular(filename string) (*os.File, error) { f, err := os.Open(filename) if err != nil { return f, err } info, err := f.Stat() if err != nil { f.Close() return nil, err } if !info.Mode().IsRegular() { f.Close() if info.IsDir() { return nil, fmt.Errorf("%s is a directory", filename) } return nil, fmt.Errorf("%s is not a regular file", filename) } return f, nil } func (b *Beat) loadDashboards(ctx context.Context, force bool) error { if force { // force implies dashboards.enabled=true if b.Config.Dashboards == nil { b.Config.Dashboards = config.NewConfig() } err := b.Config.Dashboards.SetBool("enabled", -1, true) if err != nil { return fmt.Errorf("error setting dashboard.enabled=true: %w", err) } } if b.Config.Dashboards.Enabled() { // Initialize kibana config. If username and password is set in elasticsearch output config but not in kibana, // initKibanaConfig will attach the username and password into kibana config as a part of the initialization. kibanaConfig := InitKibanaConfig(b.Config) client, err := kbn.NewKibanaClient(kibanaConfig, b.Info.Beat, b.Info.Version, version.Commit(), version.BuildTime().String()) if err != nil { return fmt.Errorf("error connecting to Kibana: %w", err) } // This fetches the version for Kibana. For the alias feature the version of ES would be needed // but it's assumed that KB and ES have the same minor version. v := client.GetVersion() indexPattern, err := kibana.NewGenerator(b.Info.IndexPrefix, b.Info.Beat, b.Fields, b.Info.Version, v, b.Config.Migration.Enabled()) if err != nil { return fmt.Errorf("error creating index pattern generator: %w", err) } pattern, err := indexPattern.Generate() if err != nil { return fmt.Errorf("error generating index pattern: %w", err) } err = dashboards.ImportDashboards(ctx, b.Info, paths.Resolve(paths.Home, ""), kibanaConfig, b.Config.Dashboards, nil, pattern) if err != nil { return fmt.Errorf("error importing Kibana dashboards: %w", err) } b.Info.Logger.Info("Kibana dashboards successfully loaded.") } return nil } // registerESVersionCheckCallback registers a global callback to make sure ES instance we are connecting // to is at least on the same version as the Beat. // If the check is disabled or the output is not Elasticsearch, nothing happens. func (b *Beat) registerESVersionCheckCallback() error { _, err := elasticsearch.RegisterGlobalCallback(func(conn *eslegclient.Connection) error { if !isElasticsearchOutput(b.Config.Output.Name()) { return errors.New("elasticsearch output is not configured") } // if we allow older versions, return early and don't check versions // versions don't matter on serverless, so always bypass if b.isConnectionToOlderVersionAllowed() || conn.IsServerless() { return nil } esVersion := conn.GetVersion() beatVersion, err := libversion.New(b.Info.Version) if err != nil { return fmt.Errorf("error fetching version from elasticsearch: %w", err) } if esVersion.LessThanMajorMinor(beatVersion) { return fmt.Errorf("%w ES=%s, Beat=%s", elasticsearch.ErrTooOld, esVersion.String(), b.Info.Version) } return nil }) return err } func (b *Beat) isConnectionToOlderVersionAllowed() bool { config := struct { AllowOlder bool `config:"allow_older_versions"` }{true} _ = b.Config.Output.Config().Unpack(&config) return config.AllowOlder } // registerESIndexManagement registers the loading of the template and ILM // policy as a callback with the elasticsearch output. It is important the // registration happens before the publisher is created. func (b *Beat) registerESIndexManagement() error { if !b.IdxSupporter.Enabled() { return nil } _, err := elasticsearch.RegisterConnectCallback(b.indexSetupCallback()) if err != nil { return fmt.Errorf("failed to register index management with elasticsearch: %w", err) } return nil } func (b *Beat) indexSetupCallback() elasticsearch.ConnectCallback { return func(esClient *eslegclient.Connection) error { mgmtHandler, err := idxmgmt.NewESClientHandler(esClient, b.Info, b.Config.LifecycleConfig) if err != nil { return fmt.Errorf("error creating index management handler: %w", err) } m := b.IdxSupporter.Manager(mgmtHandler, idxmgmt.BeatsAssets(b.Fields)) return m.Setup(idxmgmt.LoadModeEnabled, idxmgmt.LoadModeEnabled) } } func (b *Beat) makeOutputReloader(outReloader pipeline.OutputReloader) reload.Reloadable { return reload.ReloadableFunc(func(update *reload.ConfigWithMeta) error { if update == nil { return nil } if b.OutputConfigReloader != nil { if err := b.OutputConfigReloader.Reload(update); err != nil { return err } } // we need to update the output configuration because // some callbacks are relying on it to be up to date. // e.g. the Elasticsearch version validation if update.Config != nil { err := b.Config.Output.Unpack(update.Config) if err != nil { return err } } return outReloader.Reload(update, b.createOutput) }) } func (b *Beat) makeOutputFactory( cfg config.Namespace, ) func(outputs.Observer) (string, outputs.Group, error) { return func(outStats outputs.Observer) (string, outputs.Group, error) { out, err := b.createOutput(outStats, cfg) return cfg.Name(), out, err } } func (b *Beat) reloadOutputOnCertChange(cfg config.Namespace) error { logger := b.Info.Logger.Named("ssl.cert.reloader") // Here the output is created and we have access to the Beat struct (with the manager) // as a workaround we can unpack the new settings and trigger the reload-watcher from here // We get an output config, so we extract the 'SSL' bit from it rawTLSCfg, err := cfg.Config().Child("ssl", -1) if err != nil { var e ucfg.Error if errors.As(err, &e) { if errors.Is(e.Reason(), ucfg.ErrMissing) { // if the output configuration does not contain a `ssl` section // do nothing and return no error return nil } } return fmt.Errorf("could not extract the 'ssl' section of the output config: %w", err) } extendedTLSCfg := defaultCertReloadConfig() if err := rawTLSCfg.Unpack(&extendedTLSCfg); err != nil { return fmt.Errorf("unpacking 'ssl' config: %w", err) } if !extendedTLSCfg.Reload.Enabled { return nil } logger.Debug("exit on CA certs change enabled") possibleFilesToWatch := append( extendedTLSCfg.CAs, extendedTLSCfg.Certificate.Certificate, extendedTLSCfg.Certificate.Key, ) filesToWatch := []string{} for _, f := range possibleFilesToWatch { if f == "" { continue } if tlscommon.IsPEMString(f) { // That's an embedded cert, we're only interested in files continue } logger.Debugf("watching '%s' for changes", f) filesToWatch = append(filesToWatch, f) } // If there are no files to watch, don't do anything. if len(filesToWatch) == 0 { logger.Debug("no files to watch, filewatcher will not be started") return nil } watcher := filewatcher.New(filesToWatch...) // Ignore the first scan as it will always return // true for files changed. The output has not been // started yet, so even if the files have changed since // the Beat started, they don't need to be reloaded _, _, _ = watcher.Scan() // Watch for file changes while the Beat is alive go func() { ticker := time.Tick(extendedTLSCfg.Reload.Period) for { <-ticker files, changed, err := watcher.Scan() if err != nil { logger.Warnf("could not scan certificate files: %s", err.Error()) } if changed { logger.Infof( "some of the following files have been modified: %v, restarting %s.", files, b.Info.Beat) b.shouldReexec = true b.Manager.Stop() // we're done, finish the goroutine just for the sake of it return } } }() return nil } func (b *Beat) createOutput(stats outputs.Observer, cfg config.Namespace) (outputs.Group, error) { if !cfg.IsSet() { return outputs.Group{}, nil } if err := b.reloadOutputOnCertChange(cfg); err != nil { return outputs.Group{}, fmt.Errorf("could not setup output certificates reloader: %w", err) } return outputs.Load(b.IdxSupporter, b.Info, stats, cfg.Name(), cfg.Config()) } func (b *Beat) registerClusterUUIDFetching() { callback := b.clusterUUIDFetchingCallback() _, _ = elasticsearch.RegisterConnectCallback(callback) } // Build and return a callback to fetch the Elasticsearch cluster_uuid for monitoring func (b *Beat) clusterUUIDFetchingCallback() elasticsearch.ConnectCallback { elasticsearchRegistry := b.Info.Monitoring.StateRegistry.NewRegistry("outputs.elasticsearch") clusterUUIDRegVar := monitoring.NewString(elasticsearchRegistry, "cluster_uuid") callback := func(esClient *eslegclient.Connection) error { var response struct { ClusterUUID string `json:"cluster_uuid"` } status, body, err := esClient.Request("GET", "/", "", nil, nil) if err != nil { return fmt.Errorf("error querying /: %w", err) } if status > 299 { return fmt.Errorf("error querying /. Status: %d. Response body: %s", status, body) } err = json.Unmarshal(body, &response) if err != nil { return fmt.Errorf("error unmarshaling json when querying /. Body: %s", body) } clusterUUIDRegVar.Set(response.ClusterUUID) return nil } return callback } func (b *Beat) setupMonitoring(settings Settings) (report.Reporter, error) { monitoringCfg := b.Config.MonitoringBeatConfig.Monitoring monitoringClusterUUID, err := monitoring.GetClusterUUID(monitoringCfg) if err != nil { return nil, err } // Expose monitoring.cluster_uuid in state API if monitoringClusterUUID != "" { monitoringRegistry := b.Info.Monitoring.StateRegistry.NewRegistry("monitoring") clusterUUIDRegVar := monitoring.NewString(monitoringRegistry, "cluster_uuid") clusterUUIDRegVar.Set(monitoringClusterUUID) } if monitoring.IsEnabled(monitoringCfg) { err := monitoring.OverrideWithCloudSettings(monitoringCfg) if err != nil { return nil, err } settings := report.Settings{ DefaultUsername: settings.Monitoring.DefaultUsername, ClusterUUID: monitoringClusterUUID, } reporter, err := report.New(b.Info, settings, monitoringCfg, b.Config.Output) if err != nil { return nil, err } return reporter, nil } return nil, nil } // handleError handles the given error by logging it and then returning the // error. If the err is nil or is a GracefulExit error then the method will // return nil without logging anything. func handleError(err error) error { if err == nil || err == beat.GracefulExit { //nolint:errorlint // keep old behaviour return nil } // logp may not be initialized so log the err to stderr too. logp.Critical("Exiting: %v", err) fmt.Fprintf(os.Stderr, "Exiting: %v\n", err) return err } // logSystemInfo logs information about this system for situational awareness // in debugging. This information includes data about the beat, build, go // runtime, host, and process. If any of the data is not available it will be // omitted. func (b *Beat) logSystemInfo(log *logp.Logger) { defer log.Recover("An unexpected error occurred while collecting " + "information about the system.") log = log.With(logp.Namespace("system_info")) if b.Manager.Enabled() { return } // Beat beat := mapstr.M{ "type": b.Info.Beat, "uuid": b.Info.ID, "path": mapstr.M{ "config": paths.Resolve(paths.Config, ""), "data": paths.Resolve(paths.Data, ""), "home": paths.Resolve(paths.Home, ""), "logs": paths.Resolve(paths.Logs, ""), }, } log.Infow("Beat info", "beat", beat) // Build build := mapstr.M{ "commit": version.Commit(), "time": version.BuildTime(), "version": b.Info.Version, "libbeat": version.GetDefaultVersion(), } log.Infow("Build info", "build", build) // Go Runtime log.Infow("Go runtime info", "go", sysinfo.Go()) // Host if host, err := sysinfo.Host(); err == nil { hostInfo := host.Info() hostInfo.IPs = sanitizeIPs(hostInfo.IPs) log.Infow("Host info", "host", hostInfo) } // Process if self, err := sysinfo.Self(); err == nil { process := mapstr.M{} if info, err := self.Info(); err == nil { process["name"] = info.Name process["pid"] = info.PID process["ppid"] = info.PPID process["cwd"] = info.CWD process["exe"] = info.Exe process["start_time"] = info.StartTime } if proc, ok := self.(types.Seccomp); ok { if seccomp, err := proc.Seccomp(); err == nil { process["seccomp"] = seccomp } } if proc, ok := self.(types.Capabilities); ok { if caps, err := proc.Capabilities(); err == nil { process["capabilities"] = caps } } if len(process) > 0 { log.Infow("Process info", "process", process) } } } // configOptsWithKeystore returns ucfg config options with a resolver linked to the current keystore. // Refactor to allow insert into the config option array without having to redefine everything func configOptsWithKeystore(store keystore.Keystore) []ucfg.Option { return []ucfg.Option{ ucfg.PathSep("."), ucfg.Resolve(keystore.ResolverWrap(store)), ucfg.ResolveEnv, ucfg.VarExp, } } // obfuscateConfigOpts disables any resolvers in the configuration, instead we return the field // reference string directly. func obfuscateConfigOpts() []ucfg.Option { return []ucfg.Option{ ucfg.PathSep("."), ucfg.ResolveNOOP, } } func InitKibanaConfig(beatConfig beatConfig) *config.C { var esConfig *config.C if isElasticsearchOutput(beatConfig.Output.Name()) { esConfig = beatConfig.Output.Config() } // init kibana config object kibanaConfig := beatConfig.Kibana if kibanaConfig == nil { kibanaConfig = config.NewConfig() } if esConfig.Enabled() { username, _ := esConfig.String("username", -1) password, _ := esConfig.String("password", -1) api_key, _ := esConfig.String("api_key", -1) if !kibanaConfig.HasField("username") && username != "" { _ = kibanaConfig.SetString("username", -1, username) } if !kibanaConfig.HasField("password") && password != "" { _ = kibanaConfig.SetString("password", -1, password) } if !kibanaConfig.HasField("api_key") && api_key != "" { _ = kibanaConfig.SetString("api_key", -1, api_key) } } return kibanaConfig } func isElasticsearchOutput(name string) bool { return name == "elasticsearch" } func initPaths(cfg *config.C) error { // To Fix the chicken-egg problem with the Keystore and the loading of the configuration // files we are doing a partial unpack of the configuration file and only take into consideration // the paths field. After we will unpack the complete configuration and keystore reference // will be correctly replaced. partialConfig := struct { Path paths.Path `config:"path"` }{} if err := cfg.Unpack(&partialConfig); err != nil { return fmt.Errorf("error extracting default paths: %w", err) } if err := paths.InitPaths(&partialConfig.Path); err != nil { return fmt.Errorf("error setting default paths: %w", err) } return nil } // every IP address received from `Info()` has a netmask suffix // which makes every IP address invalid from the validation perspective. // If this log entry is ingested to a data stream as it is, the event will be dropped. // We must make sure every address is valid and does not have suffixes func sanitizeIPs(ips []string) []string { validIPs := make([]string, 0, len(ips)) for _, ip := range ips { if ip == "" { continue } trimIndex := strings.LastIndexByte(ip, '/') if trimIndex != -1 { ip = ip[:trimIndex] } if net.ParseIP(ip) == nil { continue } validIPs = append(validIPs, ip) } return validIPs } // promoteOutputQueueSettings checks to see if the output // configuration has queue settings defined and if so it promotes them // to the top level queue settings. This is done to allow existing // behavior of specifying queue settings at the top level or like // elastic-agent that specifies queue settings under the output func promoteOutputQueueSettings(b *Beat) error { if b.Config.Output.IsSet() && b.Config.Output.Config().Enabled() { pc := pipeline.Config{} err := b.Config.Output.Config().Unpack(&pc) if err != nil { return fmt.Errorf("error unpacking output queue settings: %w", err) } if pc.Queue.IsSet() { b.Info.Logger.Info("global queue settings replaced with output queue settings") b.Config.Pipeline.Queue = pc.Queue } } return nil } func (bc *beatConfig) Validate() error { if bc.Output.IsSet() && bc.Output.Config().Enabled() { outputPC := pipeline.Config{} err := bc.Output.Config().Unpack(&outputPC) if err != nil { return fmt.Errorf("error unpacking output queue settings: %w", err) } if bc.Pipeline.Queue.IsSet() && outputPC.Queue.IsSet() { return fmt.Errorf("top level queue and output level queue settings defined, only one is allowed") } // elastic-agent doesn't support disk queue yet if bc.Management.Enabled() && outputPC.Queue.Config().Enabled() && outputPC.Queue.Name() == diskqueue.QueueType { return fmt.Errorf("disk queue is not supported when management is enabled") } } // elastic-agent doesn't support disk queue yet if bc.Management.Enabled() && bc.Pipeline.Queue.Config().Enabled() && bc.Pipeline.Queue.Name() == diskqueue.QueueType { return fmt.Errorf("disk queue is not supported when management is enabled") } return nil }