func NewBeatReceiver()

in libbeat/cmd/instance/beat.go [265:502]


func NewBeatReceiver(settings Settings, receiverConfig map[string]interface{}, useDefaultProcessors bool, consumer consumer.Logs, core zapcore.Core) (*Beat, error) {
	b, err := NewBeat(settings.Name,
		settings.IndexPrefix,
		settings.Version,
		settings.ElasticLicensed,
		settings.Initialize)
	if err != nil {
		return nil, err
	}

	b.Info.LogConsumer = consumer

	// begin code similar to configure
	if err = plugin.Initialize(); err != nil {
		return nil, fmt.Errorf("error initializing plugins: %w", err)
	}

	b.InputQueueSize = settings.InputQueueSize

	cfOpts := []ucfg.Option{
		ucfg.PathSep("."),
		ucfg.ResolveEnv,
		ucfg.VarExp,
	}

	tmp, err := ucfg.NewFrom(receiverConfig, cfOpts...)
	if err != nil {
		return nil, fmt.Errorf("error converting receiver config to ucfg: %w", err)
	}

	cfg := (*config.C)(tmp)
	if err := initPaths(cfg); err != nil {
		return nil, fmt.Errorf("error initializing paths: %w", err)
	}

	// We have to initialize the keystore before any unpack or merging the cloud
	// options.
	store, err := LoadKeystore(cfg, b.Info.Beat)
	if err != nil {
		return nil, fmt.Errorf("could not initialize the keystore: %w", err)
	}

	if settings.DisableConfigResolver {
		config.OverwriteConfigOpts(obfuscateConfigOpts())
	} else if store != nil {
		// TODO: Allow the options to be more flexible for dynamic changes
		// note that if the store is nil it should be excluded as an option
		config.OverwriteConfigOpts(configOptsWithKeystore(store))
	}

	b.Info.Monitoring.Namespace = monitoring.GetNamespace(b.Info.Beat + "-" + b.Info.ID.String())

	b.Info.Monitoring.SetupRegistries()

	b.keystore = store
	b.Beat.Keystore = store
	err = cloudid.OverwriteSettings(cfg)
	if err != nil {
		return nil, fmt.Errorf("error overwriting cloudid settings: %w", err)
	}

	b.RawConfig = cfg
	err = cfg.Unpack(&b.Config)
	if err != nil {
		return nil, fmt.Errorf("error unpacking config data: %w", err)
	}

	logpConfig := logp.Config{}
	logpConfig.AddCaller = true
	logpConfig.Beat = b.Info.Name
	logpConfig.Files.MaxSize = 1

	if b.Config.Logging == nil {
		b.Config.Logging = config.NewConfig()
	}

	if err := b.Config.Logging.Unpack(&logpConfig); err != nil {
		return nil, fmt.Errorf("error unpacking beats logging config: %w\n%v", err, b.Config.Logging)
	}

	b.Info.Logger, err = logp.ConfigureWithCoreLocal(logpConfig, core)
	if err != nil {
		return nil, fmt.Errorf("error configuring beats logp: %w", err)
	}
	// extracting it here for ease of use
	logger := b.Info.Logger

	instrumentation, err := instrumentation.New(cfg, b.Info.Beat, b.Info.Version, logger)
	if err != nil {
		return nil, fmt.Errorf("error setting up instrumentation: %w", err)
	}
	b.Instrumentation = instrumentation

	if err := promoteOutputQueueSettings(b); err != nil {
		return nil, fmt.Errorf("could not promote output queue settings: %w", err)
	}

	if err := features.UpdateFromConfig(b.RawConfig); err != nil {
		return nil, fmt.Errorf("could not parse features: %w", err)
	}
	b.RegisterHostname(features.FQDN())

	b.Beat.Config = &b.Config.BeatConfig

	if name := b.Config.Name; name != "" {
		b.Info.Name = name
	}

	if err := common.SetTimestampPrecision(b.Config.TimestampPrecision); err != nil {
		return nil, fmt.Errorf("error setting timestamp precision: %w", err)
	}

	// log paths values to help with troubleshooting
	logger.Infof("%s", paths.Paths.String())

	metaPath := paths.Resolve(paths.Data, "meta.json")
	err = b.loadMeta(metaPath)
	if err != nil {
		return nil, fmt.Errorf("error loading meta data: %w", err)
	}

	logger.Infof("Beat ID: %v", b.Info.ID)

	// Try to get the host's FQDN and set it.
	h, err := sysinfo.Host()
	if err != nil {
		return nil, fmt.Errorf("failed to get host information: %w", err)
	}

	fqdnLookupCtx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
	defer cancel()

	fqdn, err := h.FQDNWithContext(fqdnLookupCtx)
	if err != nil {
		// FQDN lookup is "best effort".  We log the error, fallback to
		// the OS-reported hostname, and move on.
		logger.Warnf("unable to lookup FQDN: %s, using hostname = %s as FQDN", err.Error(), b.Info.Hostname)
		b.Info.FQDN = b.Info.Hostname
	} else {
		b.Info.FQDN = fqdn
	}

	// initialize config manager
	m, err := management.NewManager(b.Config.Management, b.Registry)
	if err != nil {
		return nil, fmt.Errorf("error creating new manager: %w", err)
	}
	b.Manager = m

	if b.Manager.AgentInfo().Version != "" {
		// During the manager initialization the client to connect to the agent is
		// also initialized. That makes the beat to read information sent by the
		// agent, which includes the AgentInfo with the agent's package version.
		// Components running under agent should report the agent's package version
		// as their own version.
		// In order to do so b.Info.Version needs to be set to the version the agent
		// sent. As this Beat instance is initialized much before the package
		// version is received, it's overridden here. So far it's early enough for
		// the whole beat to report the right version.
		b.Info.Version = b.Manager.AgentInfo().Version
		version.SetPackageVersion(b.Info.Version)
	}

	// build the user-agent string to be used by the outputs
	b.GenerateUserAgent()

	if err := b.Manager.CheckRawConfig(b.RawConfig); err != nil {
		return nil, fmt.Errorf("error checking raw config: %w", err)
	}

	b.Beat.BeatConfig, err = b.BeatConfig()
	if err != nil {
		return nil, fmt.Errorf("error setting BeatConfig: %w", err)
	}

	imFactory := settings.IndexManagement
	if imFactory == nil {
		imFactory = idxmgmt.MakeDefaultSupport(settings.ILM, logger)
	}
	b.IdxSupporter, err = imFactory(logger, b.Info, b.RawConfig)
	if err != nil {
		return nil, fmt.Errorf("error setting index supporter: %w", err)
	}

	b.Info.UseDefaultProcessors = useDefaultProcessors
	processingFactory := settings.Processing
	if processingFactory == nil {
		processingFactory = processing.MakeDefaultBeatSupport(true)
	}

	b.processors, err = processingFactory(b.Info, logger.Named("processors"), b.RawConfig)
	if err != nil {
		return nil, fmt.Errorf("error creating processors: %w", err)
	}

	// This should be replaced with static config for otel consumer
	// but need to figure out if we want the Queue settings from here.
	outputEnabled := b.Config.Output.IsSet() && b.Config.Output.Config().Enabled()
	if !outputEnabled {
		if b.Manager.Enabled() {
			logger.Info("Output is configured through Central Management")
		} else {
			return nil, fmt.Errorf("no outputs are defined, please define one under the output section")
		}
	}

	namespaceReg := b.Info.Monitoring.Namespace.GetRegistry()
	reg := b.Info.Monitoring.StatsRegistry.GetRegistry("libbeat")
	if reg == nil {
		reg = b.Info.Monitoring.StatsRegistry.NewRegistry("libbeat")
	}

	tel := namespaceReg.GetRegistry("state")
	if tel == nil {
		tel = namespaceReg.NewRegistry("state")
	}
	monitors := pipeline.Monitors{
		Metrics:   reg,
		Telemetry: tel,
		Logger:    logger.Named("publisher"),
		Tracer:    b.Instrumentation.Tracer(),
	}

	outputFactory := b.makeOutputFactory(b.Config.Output)

	pipelineSettings := pipeline.Settings{
		Processors:     b.processors,
		InputQueueSize: b.InputQueueSize,
	}
	publisher, err := pipeline.LoadWithSettings(b.Info, monitors, b.Config.Pipeline, outputFactory, pipelineSettings)
	if err != nil {
		return nil, fmt.Errorf("error initializing publisher: %w", err)
	}
	b.Registry.MustRegisterOutput(b.makeOutputReloader(publisher.OutputReloader()))
	b.Publisher = publisher

	return b, nil
}