func()

in internal/beatcmd/reloader.go [216:297]


func (r *Reloader) reload(inputConfig, outputConfig, apmTracingConfig *config.C) error {
	var outputNamespace config.Namespace
	if outputConfig != nil {
		if err := outputConfig.Unpack(&outputNamespace); err != nil {
			return err
		}
	}
	if inputConfig == nil || !outputNamespace.IsSet() {
		// Wait until both input and output have been received.
		// apm tracing config is not mandatory so not waiting for it
		return nil
	}
	select {
	case <-r.stopped:
		// The process is shutting down: ignore reloads.
		return nil
	default:
	}

	wrappedOutputConfig := config.MustNewConfigFrom(map[string]interface{}{
		"output": outputConfig,
	})

	var wrappedApmTracingConfig *config.C
	// apmTracingConfig is nil when disabled
	if apmTracingConfig != nil {
		c, err := apmTracingConfig.Child("elastic", -1)
		if err != nil {
			return fmt.Errorf("APM tracing config for elastic not found")
		}
		// set enabled manually as APMConfig doesn't contain it.
		c.SetBool("enabled", -1, true)
		wrappedApmTracingConfig = config.MustNewConfigFrom(map[string]interface{}{
			"instrumentation": c,
		})
	} else {
		// empty instrumentation config
		wrappedApmTracingConfig = config.NewConfig()
	}
	mergedConfig, err := config.MergeConfigs(inputConfig, wrappedOutputConfig, wrappedApmTracingConfig)
	if err != nil {
		return err
	}
	// Create a new runner. We separate creation from starting to
	// allow the runner to perform initialisations that must run
	// synchronously.
	newRunner, err := r.newRunner(RunnerParams{
		Config:          mergedConfig,
		Info:            r.info,
		Logger:          r.logger,
		TracerProvider:  r.tracerProvider,
		MeterProvider:   r.meterProvider,
		MetricsGatherer: r.metricGatherer,
	})
	if err != nil {
		return err
	}

	// Start the new runner.
	var g errgroup.Group
	ctx, cancel := context.WithCancel(context.Background())
	g.Go(func() error {
		if err := newRunner.Run(ctx); err != nil && !errors.Is(err, context.Canceled) {
			r.logger.With(logp.Error(err)).Error("runner returned with an error")
			return err
		}
		return nil
	})
	stopRunner := func() error {
		cancel()
		return g.Wait()
	}

	// Stop any existing runner.
	if r.runner != nil {
		_ = r.stopRunner() // logged above
	}
	r.runner = newRunner
	r.stopRunner = stopRunner

	return nil
}