metricbeat/beater/metricbeat.go (195 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package beater
import (
"fmt"
"sync"
"github.com/elastic/beats/v7/libbeat/autodiscover"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/cfgfile"
"github.com/elastic/beats/v7/libbeat/management"
"github.com/elastic/beats/v7/libbeat/monitoring/inputmon"
"github.com/elastic/beats/v7/metricbeat/mb"
"github.com/elastic/beats/v7/metricbeat/mb/module"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/paths"
// include all metricbeat specific builders
_ "github.com/elastic/beats/v7/metricbeat/autodiscover/builder/hints"
// include all metricbeat specific appenders
_ "github.com/elastic/beats/v7/metricbeat/autodiscover/appender/kubernetes/token"
// Add metricbeat default processors
_ "github.com/elastic/beats/v7/metricbeat/processor/add_kubernetes_metadata"
)
// Metricbeat implements the Beater interface for metricbeat.
type Metricbeat struct {
done chan struct{} // Channel used to initiate shutdown.
stopOnce sync.Once // wraps the Stop() method
runners []cfgfile.Runner // Active list of module runners.
config Config
registry *mb.Register
autodiscover *autodiscover.Autodiscover
// Options
moduleOptions []module.Option
logger *logp.Logger
}
// Option specifies some optional arguments used for configuring the behavior
// of the Metricbeat framework.
type Option func(mb *Metricbeat)
// WithModuleOptions sets the given module options on the Metricbeat framework
// and these options will be used anytime a new module is instantiated.
func WithModuleOptions(options ...module.Option) Option {
return func(mb *Metricbeat) {
mb.moduleOptions = append(mb.moduleOptions, options...)
}
}
// WithLightModules enables light modules support
func WithLightModules() Option {
return func(*Metricbeat) {
path := paths.Resolve(paths.Home, "module")
mb.Registry.SetSecondarySource(mb.NewLightModulesSource(path))
}
}
// Creator returns a beat.Creator for instantiating a new instance of the
// Metricbeat framework with the given options.
func Creator(options ...Option) beat.Creator {
return func(b *beat.Beat, c *conf.C) (beat.Beater, error) {
return newMetricbeat(b, c, mb.Registry, options...)
}
}
// CreatorWithRegistry returns a beat.Creator for instantiating a new instance of the
// Metricbeat framework with a specific registry and the given options.
func CreatorWithRegistry(registry *mb.Register, options ...Option) beat.Creator {
return func(b *beat.Beat, c *conf.C) (beat.Beater, error) {
return newMetricbeat(b, c, registry, options...)
}
}
// DefaultCreator returns a beat.Creator for instantiating a new instance of
// Metricbeat framework with the traditional Metricbeat module option of
// module.WithMetricSetInfo.
//
// This is equivalent to calling
//
// beater.Creator(
// beater.WithModuleOptions(
// module.WithMetricSetInfo(),
// ),
// )
func DefaultCreator() beat.Creator {
return Creator(
WithLightModules(),
WithModuleOptions(
module.WithMetricSetInfo(),
module.WithServiceName(),
),
)
}
// DefaultTestModulesCreator returns a customized instance of Metricbeat
// where startup delay has been disabled to workaround the fact that
// Modules() will return the static modules (not the dynamic ones)
// with a start delay.
//
// This is equivalent to calling
//
// beater.Creator(
// beater.WithLightModules(),
// beater.WithModuleOptions(
// module.WithMetricSetInfo(),
// module.WithMaxStartDelay(0),
// ),
// )
func DefaultTestModulesCreator() beat.Creator {
return Creator(
WithLightModules(),
WithModuleOptions(
module.WithMetricSetInfo(),
module.WithMaxStartDelay(0),
),
)
}
// newMetricbeat creates and returns a new Metricbeat instance.
func newMetricbeat(b *beat.Beat, c *conf.C, registry *mb.Register, options ...Option) (*Metricbeat, error) {
config := defaultConfig
if err := c.Unpack(&config); err != nil {
return nil, fmt.Errorf("error reading configuration file: %w", err)
}
dynamicCfgEnabled := config.ConfigModules.Enabled() || config.Autodiscover != nil || b.Manager.Enabled()
if !dynamicCfgEnabled && len(config.Modules) == 0 {
return nil, mb.ErrEmptyConfig
}
metricbeat := &Metricbeat{
done: make(chan struct{}),
config: config,
registry: registry,
logger: b.Info.Logger,
}
for _, applyOption := range options {
applyOption(metricbeat)
}
// List all registered modules and metricsets.
b.Info.Logger.Named("modules").Debugf("Available modules and metricsets: %s", registry.String())
if b.InSetupCmd {
// Return without instantiating the metricsets.
return metricbeat, nil
}
if b.API != nil {
if err := inputmon.AttachHandler(b.API.Router(), nil); err != nil {
return nil, fmt.Errorf("failed attach inputs api to monitoring endpoint server: %w", err)
}
}
if b.Manager != nil {
b.Manager.RegisterDiagnosticHook("input_metrics", "Metrics from active inputs.",
"input_metrics.json", "application/json", func() []byte {
data, err := inputmon.MetricSnapshotJSON(nil)
if err != nil {
b.Info.Logger.Warnw("Failed to collect input metric snapshot for Agent diagnostics.", "error", err)
return []byte(err.Error())
}
return data
})
}
moduleOptions := append(
[]module.Option{module.WithMaxStartDelay(config.MaxStartDelay)},
metricbeat.moduleOptions...)
factory := module.NewFactory(b.Info, registry, moduleOptions...)
for _, moduleCfg := range config.Modules {
if !moduleCfg.Enabled() {
continue
}
runner, err := factory.Create(b.Publisher, moduleCfg)
if err != nil {
return nil, err
}
metricbeat.runners = append(metricbeat.runners, runner)
}
if len(metricbeat.runners) == 0 && !dynamicCfgEnabled {
return nil, mb.ErrAllModulesDisabled
}
if config.Autodiscover != nil {
var err error
metricbeat.autodiscover, err = autodiscover.NewAutodiscover(
"metricbeat",
b.Publisher,
factory, autodiscover.QueryConfig(),
config.Autodiscover,
b.Keystore,
b.Info.Logger,
)
if err != nil {
return nil, err
}
}
return metricbeat, nil
}
// Run starts the workers for Metricbeat and blocks until Stop is called
// and the workers complete. Each host associated with a MetricSet is given its
// own goroutine for fetching data. The ensures that each host is isolated so
// that a single unresponsive host cannot inadvertently block other hosts
// within the same Module and MetricSet from collection.
func (bt *Metricbeat) Run(b *beat.Beat) error {
var wg sync.WaitGroup
// Static modules (metricbeat.runners)
for _, r := range bt.runners {
r.Start()
wg.Add(1)
thatRunner := r
go func() {
defer wg.Done()
<-bt.done
thatRunner.Stop()
}()
}
// Centrally managed modules
factory := module.NewFactory(b.Info, bt.registry, bt.moduleOptions...)
modules := cfgfile.NewRunnerList(management.DebugK, factory, b.Publisher, bt.logger)
b.Registry.MustRegisterInput(modules)
wg.Add(1)
go func() {
defer wg.Done()
<-bt.done
modules.Stop()
}()
// Start the manager after all the reload hooks are configured,
// the Manager is stopped at the end of the execution.
if err := b.Manager.Start(); err != nil {
return err
}
defer b.Manager.Stop()
// Dynamic file based modules (metricbeat.config.modules)
if bt.config.ConfigModules.Enabled() {
moduleReloader := cfgfile.NewReloader(bt.logger.Named("module.reload"), b.Publisher, bt.config.ConfigModules)
if err := moduleReloader.Check(factory); err != nil {
return err
}
go moduleReloader.Run(factory)
wg.Add(1)
go func() {
defer wg.Done()
<-bt.done
moduleReloader.Stop()
}()
}
// Autodiscover (metricbeat.autodiscover)
if bt.autodiscover != nil {
bt.autodiscover.Start()
wg.Add(1)
go func() {
defer wg.Done()
<-bt.done
bt.autodiscover.Stop()
}()
}
wg.Wait()
return nil
}
// Stop signals to Metricbeat that it should stop. It closes the "done" channel
// and closes the publisher client associated with each Module.
//
// Stop should only be called a single time. Calling it more than once may
// result in undefined behavior.
func (bt *Metricbeat) Stop() {
bt.stopOnce.Do(func() { close(bt.done) })
}
// Modules return a list of all configured modules.
func (bt *Metricbeat) Modules() ([]*module.Wrapper, error) {
return module.ConfiguredModules(bt.registry, bt.config.Modules, bt.config.ConfigModules, bt.moduleOptions, bt.logger)
}