heartbeat/beater/heartbeat.go (261 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package beater import ( "context" "errors" "fmt" "sync" "syscall" "time" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/beats/v7/heartbeat/config" "github.com/elastic/beats/v7/heartbeat/hbregistry" "github.com/elastic/beats/v7/heartbeat/monitors" "github.com/elastic/beats/v7/heartbeat/monitors/plugin" "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/monitorstate" "github.com/elastic/beats/v7/heartbeat/scheduler" _ "github.com/elastic/beats/v7/heartbeat/security" "github.com/elastic/beats/v7/heartbeat/tracer" "github.com/elastic/beats/v7/libbeat/autodiscover" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/cfgfile" "github.com/elastic/beats/v7/libbeat/common/backoff" "github.com/elastic/beats/v7/libbeat/common/reload" "github.com/elastic/beats/v7/libbeat/esleg/eslegclient" "github.com/elastic/beats/v7/libbeat/management" ) // Heartbeat represents the root datastructure of this beat. type Heartbeat struct { done chan struct{} stopOnce sync.Once // config is used for iterating over elements of the config. config *config.Config scheduler *scheduler.Scheduler monitorReloader *cfgfile.Reloader monitorFactory *monitors.RunnerFactory autodiscover *autodiscover.Autodiscover replaceStateLoader func(sl monitorstate.StateLoader) trace tracer.Tracer } // New creates a new heartbeat. func New(b *beat.Beat, rawConfig *conf.C) (beat.Beater, error) { parsedConfig := config.DefaultConfig() if err := rawConfig.Unpack(&parsedConfig); err != nil { return nil, fmt.Errorf("error reading config file: %w", err) } // The sock tracer should be setup before any other code to ensure its reliability // The ES Loader, for instance, can exit early var trace tracer.Tracer = tracer.NewNoopTracer() stConfig := parsedConfig.SocketTrace if stConfig != nil { // Note this, intentionally, blocks until connected to the trace endpoint var err error logp.L().Infof("Setting up sock tracer at %s (wait: %s)", stConfig.Path, stConfig.Wait) sockTrace, err := tracer.NewSockTracer(stConfig.Path, stConfig.Wait) if err == nil { trace = sockTrace } else { logp.L().Warnf("could not connect to socket trace at path %s after %s timeout: %w", stConfig.Path, stConfig.Wait, err) } } // Check if any of these can prevent using states client stateLoader, replaceStateLoader := monitorstate.AtomicStateLoader(monitorstate.NilStateLoader) if b.Config.Output.Name() == "elasticsearch" && !b.Manager.Enabled() { // Connect to ES and setup the State loader if the output is not managed by agent // Note this, intentionally, blocks until connected or max attempts reached esClient, err := makeESClient(context.TODO(), b.Config.Output.Config(), 3, 2*time.Second) if err != nil { if parsedConfig.RunOnce { trace.Abort() return nil, fmt.Errorf("run_once mode fatal error: %w", err) } else { logp.L().Warnf("skipping monitor state management: %w", err) } } else { replaceStateLoader(monitorstate.MakeESLoader(esClient, monitorstate.DefaultDataStreams, parsedConfig.RunFrom)) } } else if b.Manager.Enabled() { stateLoader, replaceStateLoader = monitorstate.DeferredStateLoader(monitorstate.NilStateLoader, 15*time.Second) } limit := parsedConfig.Scheduler.Limit schedLocationName := parsedConfig.Scheduler.Location if schedLocationName == "" { schedLocationName = "Local" } location, err := time.LoadLocation(schedLocationName) if err != nil { return nil, err } jobConfig := parsedConfig.Jobs sched := scheduler.Create(limit, hbregistry.SchedulerRegistry, location, jobConfig, parsedConfig.RunOnce) pipelineClientFactory := func(p beat.Pipeline) (beat.Client, error) { return p.Connect() } bt := &Heartbeat{ done: make(chan struct{}), config: parsedConfig, scheduler: sched, replaceStateLoader: replaceStateLoader, // monitorFactory is the factory used for creating all monitor instances, // wiring them up to everything needed to actually execute. monitorFactory: monitors.NewFactory(monitors.FactoryParams{ BeatInfo: b.Info, AddTask: sched.Add, StateLoader: stateLoader, PluginsReg: plugin.GlobalPluginsReg, PipelineClientFactory: pipelineClientFactory, BeatRunFrom: parsedConfig.RunFrom, }), trace: trace, } runFromID := "<unknown location>" if parsedConfig.RunFrom != nil { runFromID = parsedConfig.RunFrom.ID } logp.L().Infof("heartbeat starting, running from: %v", runFromID) return bt, nil } // Run executes the beat. func (bt *Heartbeat) Run(b *beat.Beat) error { bt.trace.Start() defer bt.trace.Close() // Adapt local pipeline to synchronized mode if run_once is enabled pipeline := b.Publisher var pipelineWrapper monitors.PipelineWrapper = &monitors.NoopPipelineWrapper{} if bt.config.RunOnce { sync := &monitors.SyncPipelineWrapper{} pipeline = monitors.WithSyncPipelineWrapper(pipeline, sync) pipelineWrapper = sync } logp.L().Info("heartbeat is running! Hit CTRL-C to stop it.") groups, _ := syscall.Getgroups() logp.L().Infof("Effective user/group ids: %d/%d, with groups: %v", syscall.Geteuid(), syscall.Getegid(), groups) waitMonitors := monitors.NewSignalWait() // It is important this appear before we check for run once mode // In run once mode we depend on these monitors being loaded, but not other more // dynamic types. stopStaticMonitors, err := bt.RunStaticMonitors(b, pipeline) if err != nil { return err } defer stopStaticMonitors() if bt.config.RunOnce { waitMonitors.Add(monitors.WithLog(bt.scheduler.WaitForRunOnce, "Ending run_once run.")) } if b.Manager.Enabled() { bt.RunCentralMgmtMonitors(b) } if bt.config.ConfigMonitors.Enabled() { bt.monitorReloader = cfgfile.NewReloader(b.Info.Logger.Named("module.reload"), b.Publisher, bt.config.ConfigMonitors) defer bt.monitorReloader.Stop() err := bt.RunReloadableMonitors() if err != nil { return err } } // Configure the beats Manager to start after all the reloadable hooks are initialized // and shutdown when the function return. if err := b.Manager.Start(); err != nil { return err } defer b.Manager.Stop() if bt.config.Autodiscover != nil { bt.autodiscover, err = bt.makeAutodiscover(b) if err != nil { return err } bt.autodiscover.Start() defer bt.autodiscover.Stop() } defer bt.scheduler.Stop() // Wait until run_once ends or bt is being shut down waitMonitors.AddChan(bt.done) waitMonitors.Wait() logp.L().Info("Shutting down, waiting for output to complete") // Due to defer's LIFO execution order, waitPublished.Wait() has to be // located _after_ b.Manager.Stop() or else it will exit early waitPublished := monitors.NewSignalWait() defer waitPublished.Wait() // Three possible events: global beat, run_once pipeline done and publish timeout waitPublished.AddChan(bt.done) waitPublished.Add(monitors.WithLog(pipelineWrapper.Wait, "shutdown: finished publishing events.")) if bt.config.PublishTimeout > 0 { logp.L().Infof("shutdown: output timer started. Waiting for max %v.", bt.config.PublishTimeout) waitPublished.Add(monitors.WithLog(monitors.WaitDuration(bt.config.PublishTimeout), "shutdown: timed out waiting for pipeline to publish events.")) } return nil } // RunStaticMonitors runs the `heartbeat.monitors` portion of the yaml config if present. func (bt *Heartbeat) RunStaticMonitors(b *beat.Beat, pipeline beat.Pipeline) (stop func(), err error) { runners := make([]cfgfile.Runner, 0, len(bt.config.Monitors)) for _, cfg := range bt.config.Monitors { created, err := bt.monitorFactory.Create(pipeline, cfg) if err != nil { if errors.Is(err, monitors.ErrMonitorDisabled) { logp.L().Infof("skipping disabled monitor: %s", err) continue // don't stop loading monitors just because they're disabled } return nil, fmt.Errorf("could not create monitor: %w", err) } created.Start() runners = append(runners, created) } stop = func() { for _, runner := range runners { runner.Stop() } } return stop, nil } // RunCentralMgmtMonitors loads any central management configured configs. func (bt *Heartbeat) RunCentralMgmtMonitors(b *beat.Beat) { // Register output reloader for managed outputs b.OutputConfigReloader = reload.ReloadableFunc(func(r *reload.ConfigWithMeta) error { // Do not return error here, it will prevent libbeat output from processing the same event if r == nil { return nil } outCfg := conf.Namespace{} //nolint:nilerr // we are intentionally ignoring specific errors here if err := r.Config.Unpack(&outCfg); err != nil || outCfg.Name() != "elasticsearch" { return nil } // Backoff panics with 0 duration, set to smallest unit esClient, err := makeESClient(context.TODO(), outCfg.Config(), 1, 1*time.Nanosecond) if err != nil { logp.L().Warnf("skipping monitor state management during managed reload: %w", err) } else { bt.replaceStateLoader(monitorstate.MakeESLoader(esClient, monitorstate.DefaultDataStreams, bt.config.RunFrom)) } return nil }) inputs := cfgfile.NewRunnerList(management.DebugK, bt.monitorFactory, b.Publisher, b.Info.Logger) b.Registry.MustRegisterInput(inputs) } // RunReloadableMonitors runs the `heartbeat.config.monitors` portion of the yaml config if present. func (bt *Heartbeat) RunReloadableMonitors() (err error) { // Check monitor configs if err := bt.monitorReloader.Check(bt.monitorFactory); err != nil { logp.L().Error(fmt.Errorf("error loading reloadable monitors: %w", err)) } // Execute the monitor go bt.monitorReloader.Run(bt.monitorFactory) return nil } // makeAutodiscover creates an autodiscover object ready to be started. func (bt *Heartbeat) makeAutodiscover(b *beat.Beat) (*autodiscover.Autodiscover, error) { ad, err := autodiscover.NewAutodiscover( "heartbeat", b.Publisher, bt.monitorFactory, autodiscover.QueryConfig(), bt.config.Autodiscover, b.Keystore, b.Info.Logger, ) if err != nil { return nil, err } return ad, nil } // Stop stops the beat. func (bt *Heartbeat) Stop() { bt.stopOnce.Do(func() { close(bt.done) }) } // makeESClient establishes an ES connection meant to load monitors' state func makeESClient(ctx context.Context, cfg *conf.C, attempts int, wait time.Duration) (*eslegclient.Connection, error) { var ( esClient *eslegclient.Connection err error ) // ES client backoff connectDelay := backoff.NewEqualJitterBackoff( context.Background().Done(), wait, wait, ) // Overriding the default ES request timeout: // Higher values of timeouts cannot be applied on the SAAS Service // where we are running in tight loops and want the next successive check to be run for a given monitor // within the next scheduled interval which could be 1m or 3m // Clone original config since we don't want this change to be global newCfg, err := conf.NewConfigFrom(cfg) if err != nil { return nil, fmt.Errorf("error cloning config: %w", err) } timeout := int64((10 * time.Second).Seconds()) if err := newCfg.SetInt("timeout", -1, timeout); err != nil { return nil, fmt.Errorf("error setting the ES timeout in config: %w", err) } for i := 0; i < attempts; i++ { esClient, err = eslegclient.NewConnectedClient(ctx, newCfg, "Heartbeat") if err == nil { connectDelay.Reset() return esClient, nil } else { connectDelay.Wait() } } return nil, fmt.Errorf("could not establish states loader connection after %d attempts, with %s delay", attempts, wait) }