func()

in internal/beatcmd/beat.go [286:434]


func (b *Beat) Run(ctx context.Context) error {
	defer b.Info.Logger.Sync()
	defer func() {
		if r := recover(); r != nil {
			b.Info.Logger.Fatalw("exiting due to panic",
				"panic", r,
				zap.Stack("stack"),
			)
		}
	}()
	defer b.Info.Logger.Infof("%s stopped.", b.Info.Beat)

	if runtime.GOOS == "darwin" {
		if host, err := sysinfo.Host(); err != nil {
			b.Info.Logger.Warnf("failed to retrieve kernel version, ignoring potential deprecation warning: %v", err)
		} else if strings.HasPrefix(host.Info().KernelVersion, "19.") {
			// macOS 10.15.x (catalina) means darwin kernel 19.y
			b.Info.Logger.Warn("deprecation notice: support for macOS 10.15 will be removed in an upcoming version")
		}
	}

	ctx, cancel := context.WithCancel(ctx)
	g, ctx := errgroup.WithContext(ctx)
	defer g.Wait() // ensure all goroutines exit before Run returns
	defer cancel()

	// Try to acquire exclusive lock on data path to prevent another beat instance
	// sharing same data path.
	locker := newLocker(b)
	if err := locker.lock(); err != nil {
		return err
	}
	defer locker.unlock()

	service.BeforeRun()
	defer service.Cleanup()

	b.registerMetrics()

	// Start the libbeat API server for serving stats, state, etc.
	var apiServer *api.Server
	if b.Config.HTTP.Enabled() {
		var err error
		apiServer, err = api.NewWithDefaultRoutes(b.Info.Logger, b.Config.HTTP, api.NamespaceLookupFunc())
		if err != nil {
			return fmt.Errorf("could not start the HTTP server for the API: %w", err)
		}
		apiServer.Start()
		defer apiServer.Stop()
		if b.Config.HTTPPprof.IsEnabled() {
			pprof.SetRuntimeProfilingParameters(b.Config.HTTPPprof)
			if err := pprof.HttpAttach(b.Config.HTTPPprof, apiServer); err != nil {
				return fmt.Errorf("failed to attach http handlers for pprof: %w", err)
			}
		}
	}

	monitoringReporter, err := b.setupMonitoring()
	if err != nil {
		return err
	}
	if monitoringReporter != nil {
		defer monitoringReporter.Stop()
	}

	if b.Config.MetricLogging != nil && b.Config.MetricLogging.Enabled() {
		reporter, err := log.MakeReporter(b.Info, b.Config.MetricLogging)
		if err != nil {
			return err
		}
		defer reporter.Stop()
	}

	// If enabled, collect metrics into a ring buffer.
	//
	// TODO(axw) confirm that this is used by Elastic Agent. If not, remove it?
	// This is not mentioned in our docs.
	if b.Config.HTTP.Enabled() && monitoring.IsBufferEnabled(b.Config.BufferConfig) {
		buffReporter, err := buffer.MakeReporter(b.Config.BufferConfig)
		if err != nil {
			return err
		}
		defer buffReporter.Stop()
		if err := apiServer.AttachHandler("/buffer", buffReporter); err != nil {
			return err
		}
	}

	g.Go(func() error {
		return adjustMaxProcs(ctx, 30*time.Second, b.Info.Logger)
	})

	slogger := slog.New(zapslog.NewHandler(b.Info.Logger.Core()))
	if err := adjustMemlimit(30*time.Second, slogger); err != nil {
		return err
	}

	logSystemInfo(b.Info)

	cleanup, err := b.registerElasticsearchVersionCheck()
	if err != nil {
		return err
	}
	defer cleanup()

	cleanup, err = b.registerClusterUUIDFetching()
	if err != nil {
		return err
	}
	defer cleanup()

	if err := metricreport.SetupMetrics(b.Info.Logger.Named("metrics"), b.Info.Beat, b.Info.Version); err != nil {
		return err
	}

	if b.Manager.Enabled() {
		reloader, err := NewReloader(b.Info, b.Registry, b.newRunner, b.meterProvider, b.metricGatherer, b.tracerProvider)
		if err != nil {
			return err
		}
		g.Go(func() error { return reloader.Run(ctx) })

		b.Manager.SetStopCallback(cancel)
		if err := b.Manager.Start(); err != nil {
			return fmt.Errorf("failed to start manager: %w", err)
		}
		defer b.Manager.Stop()
	} else {
		if !b.Config.Output.IsSet() {
			return errors.New("no output defined, please define one under the output section")
		}
		runner, err := b.newRunner(RunnerParams{
			Config:          b.rawConfig,
			Info:            b.Info,
			Logger:          b.Info.Logger,
			MeterProvider:   b.meterProvider,
			MetricsGatherer: b.metricGatherer,
		})
		if err != nil {
			return err
		}
		g.Go(func() error { return runner.Run(ctx) })
	}
	b.Info.Logger.Infof("%s started.", b.Info.Beat)
	if err := g.Wait(); err != nil && !errors.Is(err, context.Canceled) {
		return err
	}
	return nil
}