in libbeat/cmd/instance/beat.go [265:502]
func NewBeatReceiver(settings Settings, receiverConfig map[string]interface{}, useDefaultProcessors bool, consumer consumer.Logs, core zapcore.Core) (*Beat, error) {
b, err := NewBeat(settings.Name,
settings.IndexPrefix,
settings.Version,
settings.ElasticLicensed,
settings.Initialize)
if err != nil {
return nil, err
}
b.Info.LogConsumer = consumer
// begin code similar to configure
if err = plugin.Initialize(); err != nil {
return nil, fmt.Errorf("error initializing plugins: %w", err)
}
b.InputQueueSize = settings.InputQueueSize
cfOpts := []ucfg.Option{
ucfg.PathSep("."),
ucfg.ResolveEnv,
ucfg.VarExp,
}
tmp, err := ucfg.NewFrom(receiverConfig, cfOpts...)
if err != nil {
return nil, fmt.Errorf("error converting receiver config to ucfg: %w", err)
}
cfg := (*config.C)(tmp)
if err := initPaths(cfg); err != nil {
return nil, fmt.Errorf("error initializing paths: %w", err)
}
// We have to initialize the keystore before any unpack or merging the cloud
// options.
store, err := LoadKeystore(cfg, b.Info.Beat)
if err != nil {
return nil, fmt.Errorf("could not initialize the keystore: %w", err)
}
if settings.DisableConfigResolver {
config.OverwriteConfigOpts(obfuscateConfigOpts())
} else if store != nil {
// TODO: Allow the options to be more flexible for dynamic changes
// note that if the store is nil it should be excluded as an option
config.OverwriteConfigOpts(configOptsWithKeystore(store))
}
b.Info.Monitoring.Namespace = monitoring.GetNamespace(b.Info.Beat + "-" + b.Info.ID.String())
b.Info.Monitoring.SetupRegistries()
b.keystore = store
b.Beat.Keystore = store
err = cloudid.OverwriteSettings(cfg)
if err != nil {
return nil, fmt.Errorf("error overwriting cloudid settings: %w", err)
}
b.RawConfig = cfg
err = cfg.Unpack(&b.Config)
if err != nil {
return nil, fmt.Errorf("error unpacking config data: %w", err)
}
logpConfig := logp.Config{}
logpConfig.AddCaller = true
logpConfig.Beat = b.Info.Name
logpConfig.Files.MaxSize = 1
if b.Config.Logging == nil {
b.Config.Logging = config.NewConfig()
}
if err := b.Config.Logging.Unpack(&logpConfig); err != nil {
return nil, fmt.Errorf("error unpacking beats logging config: %w\n%v", err, b.Config.Logging)
}
b.Info.Logger, err = logp.ConfigureWithCoreLocal(logpConfig, core)
if err != nil {
return nil, fmt.Errorf("error configuring beats logp: %w", err)
}
// extracting it here for ease of use
logger := b.Info.Logger
instrumentation, err := instrumentation.New(cfg, b.Info.Beat, b.Info.Version, logger)
if err != nil {
return nil, fmt.Errorf("error setting up instrumentation: %w", err)
}
b.Instrumentation = instrumentation
if err := promoteOutputQueueSettings(b); err != nil {
return nil, fmt.Errorf("could not promote output queue settings: %w", err)
}
if err := features.UpdateFromConfig(b.RawConfig); err != nil {
return nil, fmt.Errorf("could not parse features: %w", err)
}
b.RegisterHostname(features.FQDN())
b.Beat.Config = &b.Config.BeatConfig
if name := b.Config.Name; name != "" {
b.Info.Name = name
}
if err := common.SetTimestampPrecision(b.Config.TimestampPrecision); err != nil {
return nil, fmt.Errorf("error setting timestamp precision: %w", err)
}
// log paths values to help with troubleshooting
logger.Infof("%s", paths.Paths.String())
metaPath := paths.Resolve(paths.Data, "meta.json")
err = b.loadMeta(metaPath)
if err != nil {
return nil, fmt.Errorf("error loading meta data: %w", err)
}
logger.Infof("Beat ID: %v", b.Info.ID)
// Try to get the host's FQDN and set it.
h, err := sysinfo.Host()
if err != nil {
return nil, fmt.Errorf("failed to get host information: %w", err)
}
fqdnLookupCtx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()
fqdn, err := h.FQDNWithContext(fqdnLookupCtx)
if err != nil {
// FQDN lookup is "best effort". We log the error, fallback to
// the OS-reported hostname, and move on.
logger.Warnf("unable to lookup FQDN: %s, using hostname = %s as FQDN", err.Error(), b.Info.Hostname)
b.Info.FQDN = b.Info.Hostname
} else {
b.Info.FQDN = fqdn
}
// initialize config manager
m, err := management.NewManager(b.Config.Management, b.Registry)
if err != nil {
return nil, fmt.Errorf("error creating new manager: %w", err)
}
b.Manager = m
if b.Manager.AgentInfo().Version != "" {
// During the manager initialization the client to connect to the agent is
// also initialized. That makes the beat to read information sent by the
// agent, which includes the AgentInfo with the agent's package version.
// Components running under agent should report the agent's package version
// as their own version.
// In order to do so b.Info.Version needs to be set to the version the agent
// sent. As this Beat instance is initialized much before the package
// version is received, it's overridden here. So far it's early enough for
// the whole beat to report the right version.
b.Info.Version = b.Manager.AgentInfo().Version
version.SetPackageVersion(b.Info.Version)
}
// build the user-agent string to be used by the outputs
b.GenerateUserAgent()
if err := b.Manager.CheckRawConfig(b.RawConfig); err != nil {
return nil, fmt.Errorf("error checking raw config: %w", err)
}
b.Beat.BeatConfig, err = b.BeatConfig()
if err != nil {
return nil, fmt.Errorf("error setting BeatConfig: %w", err)
}
imFactory := settings.IndexManagement
if imFactory == nil {
imFactory = idxmgmt.MakeDefaultSupport(settings.ILM, logger)
}
b.IdxSupporter, err = imFactory(logger, b.Info, b.RawConfig)
if err != nil {
return nil, fmt.Errorf("error setting index supporter: %w", err)
}
b.Info.UseDefaultProcessors = useDefaultProcessors
processingFactory := settings.Processing
if processingFactory == nil {
processingFactory = processing.MakeDefaultBeatSupport(true)
}
b.processors, err = processingFactory(b.Info, logger.Named("processors"), b.RawConfig)
if err != nil {
return nil, fmt.Errorf("error creating processors: %w", err)
}
// This should be replaced with static config for otel consumer
// but need to figure out if we want the Queue settings from here.
outputEnabled := b.Config.Output.IsSet() && b.Config.Output.Config().Enabled()
if !outputEnabled {
if b.Manager.Enabled() {
logger.Info("Output is configured through Central Management")
} else {
return nil, fmt.Errorf("no outputs are defined, please define one under the output section")
}
}
namespaceReg := b.Info.Monitoring.Namespace.GetRegistry()
reg := b.Info.Monitoring.StatsRegistry.GetRegistry("libbeat")
if reg == nil {
reg = b.Info.Monitoring.StatsRegistry.NewRegistry("libbeat")
}
tel := namespaceReg.GetRegistry("state")
if tel == nil {
tel = namespaceReg.NewRegistry("state")
}
monitors := pipeline.Monitors{
Metrics: reg,
Telemetry: tel,
Logger: logger.Named("publisher"),
Tracer: b.Instrumentation.Tracer(),
}
outputFactory := b.makeOutputFactory(b.Config.Output)
pipelineSettings := pipeline.Settings{
Processors: b.processors,
InputQueueSize: b.InputQueueSize,
}
publisher, err := pipeline.LoadWithSettings(b.Info, monitors, b.Config.Pipeline, outputFactory, pipelineSettings)
if err != nil {
return nil, fmt.Errorf("error initializing publisher: %w", err)
}
b.Registry.MustRegisterOutput(b.makeOutputReloader(publisher.OutputReloader()))
b.Publisher = publisher
return b, nil
}