in internal/beatcmd/beat.go [286:434]
func (b *Beat) Run(ctx context.Context) error {
defer b.Info.Logger.Sync()
defer func() {
if r := recover(); r != nil {
b.Info.Logger.Fatalw("exiting due to panic",
"panic", r,
zap.Stack("stack"),
)
}
}()
defer b.Info.Logger.Infof("%s stopped.", b.Info.Beat)
if runtime.GOOS == "darwin" {
if host, err := sysinfo.Host(); err != nil {
b.Info.Logger.Warnf("failed to retrieve kernel version, ignoring potential deprecation warning: %v", err)
} else if strings.HasPrefix(host.Info().KernelVersion, "19.") {
// macOS 10.15.x (catalina) means darwin kernel 19.y
b.Info.Logger.Warn("deprecation notice: support for macOS 10.15 will be removed in an upcoming version")
}
}
ctx, cancel := context.WithCancel(ctx)
g, ctx := errgroup.WithContext(ctx)
defer g.Wait() // ensure all goroutines exit before Run returns
defer cancel()
// Try to acquire exclusive lock on data path to prevent another beat instance
// sharing same data path.
locker := newLocker(b)
if err := locker.lock(); err != nil {
return err
}
defer locker.unlock()
service.BeforeRun()
defer service.Cleanup()
b.registerMetrics()
// Start the libbeat API server for serving stats, state, etc.
var apiServer *api.Server
if b.Config.HTTP.Enabled() {
var err error
apiServer, err = api.NewWithDefaultRoutes(b.Info.Logger, b.Config.HTTP, api.NamespaceLookupFunc())
if err != nil {
return fmt.Errorf("could not start the HTTP server for the API: %w", err)
}
apiServer.Start()
defer apiServer.Stop()
if b.Config.HTTPPprof.IsEnabled() {
pprof.SetRuntimeProfilingParameters(b.Config.HTTPPprof)
if err := pprof.HttpAttach(b.Config.HTTPPprof, apiServer); err != nil {
return fmt.Errorf("failed to attach http handlers for pprof: %w", err)
}
}
}
monitoringReporter, err := b.setupMonitoring()
if err != nil {
return err
}
if monitoringReporter != nil {
defer monitoringReporter.Stop()
}
if b.Config.MetricLogging != nil && b.Config.MetricLogging.Enabled() {
reporter, err := log.MakeReporter(b.Info, b.Config.MetricLogging)
if err != nil {
return err
}
defer reporter.Stop()
}
// If enabled, collect metrics into a ring buffer.
//
// TODO(axw) confirm that this is used by Elastic Agent. If not, remove it?
// This is not mentioned in our docs.
if b.Config.HTTP.Enabled() && monitoring.IsBufferEnabled(b.Config.BufferConfig) {
buffReporter, err := buffer.MakeReporter(b.Config.BufferConfig)
if err != nil {
return err
}
defer buffReporter.Stop()
if err := apiServer.AttachHandler("/buffer", buffReporter); err != nil {
return err
}
}
g.Go(func() error {
return adjustMaxProcs(ctx, 30*time.Second, b.Info.Logger)
})
slogger := slog.New(zapslog.NewHandler(b.Info.Logger.Core()))
if err := adjustMemlimit(30*time.Second, slogger); err != nil {
return err
}
logSystemInfo(b.Info)
cleanup, err := b.registerElasticsearchVersionCheck()
if err != nil {
return err
}
defer cleanup()
cleanup, err = b.registerClusterUUIDFetching()
if err != nil {
return err
}
defer cleanup()
if err := metricreport.SetupMetrics(b.Info.Logger.Named("metrics"), b.Info.Beat, b.Info.Version); err != nil {
return err
}
if b.Manager.Enabled() {
reloader, err := NewReloader(b.Info, b.Registry, b.newRunner, b.meterProvider, b.metricGatherer, b.tracerProvider)
if err != nil {
return err
}
g.Go(func() error { return reloader.Run(ctx) })
b.Manager.SetStopCallback(cancel)
if err := b.Manager.Start(); err != nil {
return fmt.Errorf("failed to start manager: %w", err)
}
defer b.Manager.Stop()
} else {
if !b.Config.Output.IsSet() {
return errors.New("no output defined, please define one under the output section")
}
runner, err := b.newRunner(RunnerParams{
Config: b.rawConfig,
Info: b.Info,
Logger: b.Info.Logger,
MeterProvider: b.meterProvider,
MetricsGatherer: b.metricGatherer,
})
if err != nil {
return err
}
g.Go(func() error { return runner.Run(ctx) })
}
b.Info.Logger.Infof("%s started.", b.Info.Beat)
if err := g.Wait(); err != nil && !errors.Is(err, context.Canceled) {
return err
}
return nil
}