internal/beatcmd/beat.go (723 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package beatcmd import ( "context" "encoding/json" "errors" "fmt" "io" "log/slog" "os" "os/user" "runtime" "runtime/debug" "strconv" "strings" "time" "github.com/gofrs/uuid/v5" "go.elastic.co/apm/module/apmotel/v2" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" sdkmetric "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" "go.uber.org/zap/exp/zapslog" "golang.org/x/sync/errgroup" "github.com/elastic/beats/v7/libbeat/api" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common/reload" "github.com/elastic/beats/v7/libbeat/esleg/eslegclient" "github.com/elastic/beats/v7/libbeat/licenser" "github.com/elastic/beats/v7/libbeat/management" "github.com/elastic/beats/v7/libbeat/monitoring/report" "github.com/elastic/beats/v7/libbeat/monitoring/report/log" "github.com/elastic/beats/v7/libbeat/outputs/elasticsearch" "github.com/elastic/beats/v7/libbeat/pprof" "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/file" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" "github.com/elastic/elastic-agent-libs/monitoring" "github.com/elastic/elastic-agent-libs/monitoring/report/buffer" "github.com/elastic/elastic-agent-libs/paths" "github.com/elastic/elastic-agent-libs/service" libversion "github.com/elastic/elastic-agent-libs/version" "github.com/elastic/elastic-agent-system-metrics/metric/system/host" metricreport "github.com/elastic/elastic-agent-system-metrics/report" sysinfo "github.com/elastic/go-sysinfo" "github.com/elastic/go-sysinfo/types" "github.com/elastic/apm-server/internal/version" ) const ( defaultMonitoringUsername = "apm_system" ) // Beat provides the runnable and configurable instance of a beat. type Beat struct { beat.Beat Config *Config rawConfig *config.C newRunner NewRunnerFunc tracerProvider trace.TracerProvider metricReader *sdkmetric.ManualReader meterProvider *sdkmetric.MeterProvider metricGatherer *apmotel.Gatherer } // BeatParams holds parameters for NewBeat. type BeatParams struct { // NewRunner holds a NewRunnerFunc for creating a Runner. // // If (Fleet) management is enabled, NewRunner may be called multiple // times, whenever configuration is reloaded. Otherwise, NewRunner will // be called once with the initial, static, configuration. NewRunner NewRunnerFunc // ElasticLicensed indicates whether this build of APM Server // is licensed with the Elastic License v2. ElasticLicensed bool } // NewBeat creates a new Beat. func NewBeat(args BeatParams) (*Beat, error) { cfg, rawConfig, keystore, err := LoadConfig() if err != nil { return nil, err } hostname, err := os.Hostname() if err != nil { return nil, err } beatName := cfg.Name if beatName == "" { beatName = hostname } exporter, err := apmotel.NewGatherer() if err != nil { return nil, err } metricReader := sdkmetric.NewManualReader() meterProvider := sdkmetric.NewMeterProvider( sdkmetric.WithReader(exporter), sdkmetric.WithReader(metricReader), ) otel.SetMeterProvider(meterProvider) eid := uuid.FromStringOrNil(metricreport.EphemeralID().String()) b := &Beat{ Beat: beat.Beat{ Info: beat.Info{ Beat: "apm-server", ElasticLicensed: args.ElasticLicensed, IndexPrefix: "apm-server", Version: version.VersionWithQualifier(), Name: beatName, Hostname: hostname, StartTime: time.Now(), EphemeralID: eid, }, Keystore: keystore, Config: &beat.BeatConfig{Output: cfg.Output}, BeatConfig: cfg.APMServer, Registry: reload.NewRegistry(), }, Config: cfg, newRunner: args.NewRunner, rawConfig: rawConfig, metricReader: metricReader, meterProvider: meterProvider, metricGatherer: &exporter, } if err := b.init(); err != nil { return nil, err } return b, nil } // init initializes logging, config management, GOMAXPROCS, and GC percent. func (b *Beat) init() error { if err := configureLogging(b.Config); err != nil { return fmt.Errorf("failed to configure logging: %w", err) } b.Beat.Info.Logger = logp.NewLogger("") // log paths values to help with troubleshooting b.Info.Logger.Infof("%s", paths.Paths.String()) // Load the unique ID and "first start" info from meta.json. metaPath := paths.Resolve(paths.Data, "meta.json") if err := b.loadMeta(metaPath); err != nil { return err } b.Info.Logger.Infof("Beat ID: %v", b.Info.ID) // Initialize central config manager. manager, err := management.NewManager(b.Config.Management, b.Registry) if err != nil { return err } b.Manager = manager if maxProcs := b.Config.MaxProcs; maxProcs > 0 { b.Info.Logger.Infof("Set max procs limit: %v", maxProcs) runtime.GOMAXPROCS(maxProcs) } if gcPercent := b.Config.GCPercent; gcPercent > 0 { b.Info.Logger.Infof("Set gc percentage to: %v", gcPercent) debug.SetGCPercent(gcPercent) } return nil } func (b *Beat) loadMeta(metaPath string) error { type meta struct { UUID uuid.UUID `json:"uuid"` FirstStart time.Time `json:"first_start"` } b.Info.Logger.Debugf("beat", "Beat metadata path: %v", metaPath) f, err := openRegular(metaPath) if err != nil && !os.IsNotExist(err) { return fmt.Errorf("meta file failed to open: %w", err) } if err == nil { var m meta if err := json.NewDecoder(f).Decode(&m); err != nil && err != io.EOF { f.Close() return fmt.Errorf("Beat meta file reading error: %w", err) } f.Close() b.Info.FirstStart = m.FirstStart b.Info.ID = m.UUID } rewrite := false if b.Info.FirstStart.IsZero() { b.Info.FirstStart = b.Info.StartTime rewrite = true } if b.Info.ID == uuid.Nil { id, err := uuid.NewV4() if err != nil { return err } b.Info.ID = id rewrite = true } if !rewrite { return nil } // meta.json does not exist, or the contents are invalid: write a new file. // // Write a temporary file, and then atomically move it into place in case // of errors occurring half way through. encodedMeta, err := json.Marshal(meta{ UUID: b.Info.ID, FirstStart: b.Info.FirstStart, }) if err != nil { return fmt.Errorf("failed to encode metadata: %w", err) } tempFile := metaPath + ".new" if err := os.WriteFile(tempFile, encodedMeta, 0600); err != nil { return fmt.Errorf("failed to write metadata: %w", err) } // move temporary file into final location return file.SafeFileRotate(metaPath, tempFile) } func openRegular(filename string) (*os.File, error) { f, err := os.Open(filename) if err != nil { return f, err } info, err := f.Stat() if err != nil { f.Close() return nil, err } if !info.Mode().IsRegular() { f.Close() if info.IsDir() { return nil, fmt.Errorf("%s is a directory", filename) } return nil, fmt.Errorf("%s is not a regular file", filename) } return f, nil } func (b *Beat) Run(ctx context.Context) error { defer b.Info.Logger.Sync() defer func() { if r := recover(); r != nil { b.Info.Logger.Fatalw("exiting due to panic", "panic", r, zap.Stack("stack"), ) } }() defer b.Info.Logger.Infof("%s stopped.", b.Info.Beat) if runtime.GOOS == "darwin" { if host, err := sysinfo.Host(); err != nil { b.Info.Logger.Warnf("failed to retrieve kernel version, ignoring potential deprecation warning: %v", err) } else if strings.HasPrefix(host.Info().KernelVersion, "19.") { // macOS 10.15.x (catalina) means darwin kernel 19.y b.Info.Logger.Warn("deprecation notice: support for macOS 10.15 will be removed in an upcoming version") } } ctx, cancel := context.WithCancel(ctx) g, ctx := errgroup.WithContext(ctx) defer g.Wait() // ensure all goroutines exit before Run returns defer cancel() // Try to acquire exclusive lock on data path to prevent another beat instance // sharing same data path. locker := newLocker(b) if err := locker.lock(); err != nil { return err } defer locker.unlock() service.BeforeRun() defer service.Cleanup() b.registerMetrics() // Start the libbeat API server for serving stats, state, etc. var apiServer *api.Server if b.Config.HTTP.Enabled() { var err error apiServer, err = api.NewWithDefaultRoutes(b.Info.Logger, b.Config.HTTP, api.NamespaceLookupFunc()) if err != nil { return fmt.Errorf("could not start the HTTP server for the API: %w", err) } apiServer.Start() defer apiServer.Stop() if b.Config.HTTPPprof.IsEnabled() { pprof.SetRuntimeProfilingParameters(b.Config.HTTPPprof) if err := pprof.HttpAttach(b.Config.HTTPPprof, apiServer); err != nil { return fmt.Errorf("failed to attach http handlers for pprof: %w", err) } } } monitoringReporter, err := b.setupMonitoring() if err != nil { return err } if monitoringReporter != nil { defer monitoringReporter.Stop() } if b.Config.MetricLogging != nil && b.Config.MetricLogging.Enabled() { reporter, err := log.MakeReporter(b.Info, b.Config.MetricLogging) if err != nil { return err } defer reporter.Stop() } // If enabled, collect metrics into a ring buffer. // // TODO(axw) confirm that this is used by Elastic Agent. If not, remove it? // This is not mentioned in our docs. if b.Config.HTTP.Enabled() && monitoring.IsBufferEnabled(b.Config.BufferConfig) { buffReporter, err := buffer.MakeReporter(b.Config.BufferConfig) if err != nil { return err } defer buffReporter.Stop() if err := apiServer.AttachHandler("/buffer", buffReporter); err != nil { return err } } g.Go(func() error { return adjustMaxProcs(ctx, 30*time.Second, b.Info.Logger) }) slogger := slog.New(zapslog.NewHandler(b.Info.Logger.Core())) if err := adjustMemlimit(30*time.Second, slogger); err != nil { return err } logSystemInfo(b.Info) cleanup, err := b.registerElasticsearchVersionCheck() if err != nil { return err } defer cleanup() cleanup, err = b.registerClusterUUIDFetching() if err != nil { return err } defer cleanup() if err := metricreport.SetupMetrics(b.Info.Logger.Named("metrics"), b.Info.Beat, b.Info.Version); err != nil { return err } if b.Manager.Enabled() { reloader, err := NewReloader(b.Info, b.Registry, b.newRunner, b.meterProvider, b.metricGatherer, b.tracerProvider) if err != nil { return err } g.Go(func() error { return reloader.Run(ctx) }) b.Manager.SetStopCallback(cancel) if err := b.Manager.Start(); err != nil { return fmt.Errorf("failed to start manager: %w", err) } defer b.Manager.Stop() } else { if !b.Config.Output.IsSet() { return errors.New("no output defined, please define one under the output section") } runner, err := b.newRunner(RunnerParams{ Config: b.rawConfig, Info: b.Info, Logger: b.Info.Logger, MeterProvider: b.meterProvider, MetricsGatherer: b.metricGatherer, }) if err != nil { return err } g.Go(func() error { return runner.Run(ctx) }) } b.Info.Logger.Infof("%s started.", b.Info.Beat) if err := g.Wait(); err != nil && !errors.Is(err, context.Canceled) { return err } return nil } // registerMetrics registers metrics with the internal monitoring API. This data // is then exposed through the HTTP monitoring endpoint (e.g. /info and /state) // and/or pushed to Elasticsearch through the x-pack monitoring feature. func (b *Beat) registerMetrics() { b.registerInfoMetrics() b.registerStateMetrics() b.registerStatsMetrics() } func (b *Beat) registerInfoMetrics() { infoRegistry := monitoring.GetNamespace("info").GetRegistry() monitoring.NewString(infoRegistry, "version").Set(b.Info.Version) monitoring.NewString(infoRegistry, "beat").Set(b.Info.Beat) monitoring.NewString(infoRegistry, "name").Set(b.Info.Name) monitoring.NewString(infoRegistry, "hostname").Set(b.Info.Hostname) monitoring.NewString(infoRegistry, "uuid").Set(b.Info.ID.String()) monitoring.NewString(infoRegistry, "ephemeral_id").Set(b.Info.EphemeralID.String()) monitoring.NewString(infoRegistry, "binary_arch").Set(runtime.GOARCH) monitoring.NewString(infoRegistry, "build_commit").Set(version.CommitHash()) monitoring.NewTimestamp(infoRegistry, "build_time").Set(version.CommitTime()) monitoring.NewBool(infoRegistry, "elastic_licensed").Set(b.Info.ElasticLicensed) // Add user metadata data asynchronously (on Windows the lookup can take up to 60s). go func() { if u, err := user.Current(); err != nil { // This usually happens if the user UID does not exist in /etc/passwd. It might be the case on K8S // if the user set securityContext.runAsUser to an arbitrary value. monitoring.NewString(infoRegistry, "uid").Set(strconv.Itoa(os.Getuid())) monitoring.NewString(infoRegistry, "gid").Set(strconv.Itoa(os.Getgid())) } else { monitoring.NewString(infoRegistry, "username").Set(u.Username) monitoring.NewString(infoRegistry, "uid").Set(u.Uid) monitoring.NewString(infoRegistry, "gid").Set(u.Gid) } }() } func (b *Beat) registerStateMetrics() { stateRegistry := monitoring.GetNamespace("state").GetRegistry() // state.service serviceRegistry := stateRegistry.NewRegistry("service") monitoring.NewString(serviceRegistry, "version").Set(b.Info.Version) monitoring.NewString(serviceRegistry, "name").Set(b.Info.Beat) monitoring.NewString(serviceRegistry, "id").Set(b.Info.ID.String()) // state.beat beatRegistry := stateRegistry.NewRegistry("beat") monitoring.NewString(beatRegistry, "name").Set(b.Info.Name) // state.host monitoring.NewFunc(stateRegistry, "host", host.ReportInfo("" /* don't use FQDN */), monitoring.Report) // state.management managementRegistry := stateRegistry.NewRegistry("management") monitoring.NewBool(managementRegistry, "enabled").Set(b.Manager.Enabled()) } func (b *Beat) registerStatsMetrics() { libbeatRegistry := monitoring.Default.GetRegistry("libbeat") monitoring.NewFunc(libbeatRegistry, "output", func(_ monitoring.Mode, v monitoring.Visitor) { var rm metricdata.ResourceMetrics if err := b.metricReader.Collect(context.Background(), &rm); err != nil { return } v.OnRegistryStart() defer v.OnRegistryFinished() for _, sm := range rm.ScopeMetrics { switch { case sm.Scope.Name == "github.com/elastic/go-docappender": monitoring.ReportString(v, "type", "elasticsearch") addDocappenderLibbeatOutputMetrics(context.Background(), v, sm) } } }) monitoring.NewFunc(libbeatRegistry, "pipeline", func(_ monitoring.Mode, v monitoring.Visitor) { var rm metricdata.ResourceMetrics if err := b.metricReader.Collect(context.Background(), &rm); err != nil { return } v.OnRegistryStart() defer v.OnRegistryFinished() for _, sm := range rm.ScopeMetrics { switch { case sm.Scope.Name == "github.com/elastic/go-docappender": addDocappenderLibbeatPipelineMetrics(context.Background(), v, sm) } } }) monitoring.NewFunc(monitoring.Default, "output.elasticsearch", func(_ monitoring.Mode, v monitoring.Visitor) { var rm metricdata.ResourceMetrics if err := b.metricReader.Collect(context.Background(), &rm); err != nil { return } v.OnRegistryStart() defer v.OnRegistryFinished() for _, sm := range rm.ScopeMetrics { switch { case sm.Scope.Name == "github.com/elastic/go-docappender": addDocappenderOutputElasticsearchMetrics(context.Background(), v, sm) } } }) monitoring.NewFunc(monitoring.Default, "apm-server", func(_ monitoring.Mode, v monitoring.Visitor) { var rm metricdata.ResourceMetrics if err := b.metricReader.Collect(context.Background(), &rm); err != nil { return } v.OnRegistryStart() defer v.OnRegistryFinished() for _, sm := range rm.ScopeMetrics { switch { case strings.HasPrefix(sm.Scope.Name, "github.com/elastic/apm-server"): // All simple scalar metrics that begin with the name "apm-server." // in github.com/elastic/apm-server/... scopes are mapped directly. addAPMServerMetrics(v, sm) } } }) } // getScalarInt64 returns a single-value, dimensionless // gauge or counter integer value, or (0, false) if the // data does not match these constraints. func getScalarInt64(data metricdata.Aggregation) (int64, bool) { switch data := data.(type) { case metricdata.Sum[int64]: if len(data.DataPoints) != 1 || data.DataPoints[0].Attributes.Len() != 0 { break } return data.DataPoints[0].Value, true case metricdata.Gauge[int64]: if len(data.DataPoints) != 1 || data.DataPoints[0].Attributes.Len() != 0 { break } return data.DataPoints[0].Value, true } return 0, false } func addAPMServerMetrics(v monitoring.Visitor, sm metricdata.ScopeMetrics) { beatsMetrics := make(map[string]any) for _, m := range sm.Metrics { if suffix, ok := strings.CutPrefix(m.Name, "apm-server."); ok { if value, ok := getScalarInt64(m.Data); ok { current := beatsMetrics suffixSlice := strings.Split(suffix, ".") for i := 0; i < len(suffixSlice)-1; i++ { k := suffixSlice[i] if _, ok := current[k]; !ok { current[k] = make(map[string]any) } if currentmap, ok := current[k].(map[string]any); ok { current = currentmap } } current[suffixSlice[len(suffixSlice)-1]] = value } } } reportOnKey(v, beatsMetrics) } func reportOnKey(v monitoring.Visitor, m map[string]any) { for key, value := range m { if valueMap, ok := value.(map[string]any); ok { v.OnRegistryStart() v.OnKey(key) reportOnKey(v, valueMap) v.OnRegistryFinished() } if valueMetric, ok := value.(int64); ok { monitoring.ReportInt(v, key, valueMetric) } } } // Adapt go-docappender's OTel metrics to beats stack monitoring metrics, // with a mixture of libbeat-specific and apm-server specific metric names. func addDocappenderLibbeatOutputMetrics(ctx context.Context, v monitoring.Visitor, sm metricdata.ScopeMetrics) { var writeBytes int64 v.OnRegistryStart() v.OnKey("events") for _, m := range sm.Metrics { switch m.Name { case "elasticsearch.events.processed": var acked, toomany, failed int64 data, _ := m.Data.(metricdata.Sum[int64]) for _, dp := range data.DataPoints { status, ok := dp.Attributes.Value(attribute.Key("status")) if !ok { continue } switch status.AsString() { case "Success": acked += dp.Value case "TooMany": toomany += dp.Value fallthrough default: failed += dp.Value } } monitoring.ReportInt(v, "acked", acked) monitoring.ReportInt(v, "failed", failed) monitoring.ReportInt(v, "toomany", toomany) case "elasticsearch.events.count": if value, ok := getScalarInt64(m.Data); ok { monitoring.ReportInt(v, "total", value) } case "elasticsearch.events.queued": if value, ok := getScalarInt64(m.Data); ok { monitoring.ReportInt(v, "active", value) } case "elasticsearch.flushed.bytes": if value, ok := getScalarInt64(m.Data); ok { writeBytes = value } case "elasticsearch.bulk_requests.count": if value, ok := getScalarInt64(m.Data); ok { monitoring.ReportInt(v, "batches", value) } } } v.OnRegistryFinished() if writeBytes > 0 { v.OnRegistryStart() v.OnKey("write") monitoring.ReportInt(v, "bytes", writeBytes) v.OnRegistryFinished() } } func addDocappenderLibbeatPipelineMetrics(ctx context.Context, v monitoring.Visitor, sm metricdata.ScopeMetrics) { v.OnRegistryStart() defer v.OnRegistryFinished() v.OnKey("events") for _, m := range sm.Metrics { switch m.Name { case "elasticsearch.events.count": if value, ok := getScalarInt64(m.Data); ok { monitoring.ReportInt(v, "total", value) } } } } // Add non-libbeat Elasticsearch output metrics under "output.elasticsearch". func addDocappenderOutputElasticsearchMetrics(ctx context.Context, v monitoring.Visitor, sm metricdata.ScopeMetrics) { var bulkRequestsCount, bulkRequestsAvailable int64 var indexersCreated, indexersDestroyed int64 for _, m := range sm.Metrics { switch m.Name { case "elasticsearch.bulk_requests.count": if value, ok := getScalarInt64(m.Data); ok { bulkRequestsCount = value } case "elasticsearch.bulk_requests.available": if value, ok := getScalarInt64(m.Data); ok { bulkRequestsAvailable = value } case "elasticsearch.indexer.created": if value, ok := getScalarInt64(m.Data); ok { indexersCreated = value } case "elasticsearch.indexer.destroyed": if value, ok := getScalarInt64(m.Data); ok { indexersDestroyed = value } } } v.OnRegistryStart() v.OnKey("bulk_requests") monitoring.ReportInt(v, "completed", bulkRequestsCount) monitoring.ReportInt(v, "available", bulkRequestsAvailable) v.OnRegistryFinished() v.OnRegistryStart() v.OnKey("indexers") monitoring.ReportInt(v, "created", indexersCreated) monitoring.ReportInt(v, "destroyed", indexersDestroyed) monitoring.ReportInt(v, "active", indexersCreated-indexersDestroyed+1) v.OnRegistryFinished() } // registerElasticsearchVerfication registers a global callback to make sure // the Elasticsearch instance we are connecting to has a valid license, and is // at least on the same version as APM Server. // // registerElasticsearchVerification returns a cleanup function which must be // called on shutdown. func (b *Beat) registerElasticsearchVersionCheck() (func(), error) { uuid, err := elasticsearch.RegisterGlobalCallback(func(conn *eslegclient.Connection) error { if err := licenser.FetchAndVerify(conn); err != nil { return err } esVersion := conn.GetVersion() beatVersion, err := libversion.New(b.Info.Version) if err != nil { return err } if esVersion.LessThanMajorMinor(beatVersion) { return fmt.Errorf( "%w Elasticsearch: %s, APM Server: %s", elasticsearch.ErrTooOld, esVersion.String(), b.Info.Version, ) } return nil }) if err != nil { return nil, err } return func() { elasticsearch.DeregisterGlobalCallback(uuid) }, nil } func (b *Beat) registerClusterUUIDFetching() (func(), error) { callback := b.clusterUUIDFetchingCallback() uuid, err := elasticsearch.RegisterConnectCallback(callback) if err != nil { return nil, err } return func() { elasticsearch.DeregisterConnectCallback(uuid) }, nil } // Build and return a callback to fetch the Elasticsearch cluster_uuid for monitoring func (b *Beat) clusterUUIDFetchingCallback() elasticsearch.ConnectCallback { stateRegistry := monitoring.GetNamespace("state").GetRegistry() elasticsearchRegistry := stateRegistry.NewRegistry("outputs.elasticsearch") clusterUUIDRegVar := monitoring.NewString(elasticsearchRegistry, "cluster_uuid") callback := func(esClient *eslegclient.Connection) error { var response struct { ClusterUUID string `json:"cluster_uuid"` } status, body, err := esClient.Request("GET", "/", "", nil, nil) if err != nil { return fmt.Errorf("error querying /: %w", err) } if status > 299 { return fmt.Errorf("error querying /. Status: %d. Response body: %s", status, body) } err = json.Unmarshal(body, &response) if err != nil { return fmt.Errorf("error unmarshaling json when querying /. Body: %s", body) } clusterUUIDRegVar.Set(response.ClusterUUID) return nil } return callback } func (b *Beat) setupMonitoring() (report.Reporter, error) { monitoringCfg := b.Config.Monitoring monitoringClusterUUID, err := monitoring.GetClusterUUID(monitoringCfg) if err != nil { return nil, err } // Expose monitoring.cluster_uuid in state API if monitoringClusterUUID != "" { stateRegistry := monitoring.GetNamespace("state").GetRegistry() monitoringRegistry := stateRegistry.NewRegistry("monitoring") clusterUUIDRegVar := monitoring.NewString(monitoringRegistry, "cluster_uuid") clusterUUIDRegVar.Set(monitoringClusterUUID) } if monitoring.IsEnabled(monitoringCfg) { err := monitoring.OverrideWithCloudSettings(monitoringCfg) if err != nil { return nil, err } settings := report.Settings{ DefaultUsername: defaultMonitoringUsername, ClusterUUID: monitoringClusterUUID, } reporter, err := report.New(b.Info, settings, monitoringCfg, b.Config.Output) if err != nil { return nil, err } return reporter, nil } return nil, nil } // logSystemInfo logs information about this system for situational awareness // in debugging. This information includes data about the beat, build, go // runtime, host, and process. If any of the data is not available it will be // omitted. func logSystemInfo(info beat.Info) { defer info.Logger.Recover("An unexpected error occurred while collecting " + "information about the system.") log := info.Logger.Named("beat").With(logp.Namespace("system_info")) // Beat beat := mapstr.M{ "type": info.Beat, "uuid": info.ID, "path": mapstr.M{ "config": paths.Resolve(paths.Config, ""), "data": paths.Resolve(paths.Data, ""), "home": paths.Resolve(paths.Home, ""), "logs": paths.Resolve(paths.Logs, ""), }, } log.Infow("Beat info", "beat", beat) // Build build := mapstr.M{ "commit": version.CommitHash(), "time": version.CommitTime(), "version": info.Version, } log.Infow("Build info", "build", build) // Go Runtime log.Infow("Go runtime info", "go", sysinfo.Go()) // Host if host, err := sysinfo.Host(); err == nil { log.Infow("Host info", "host", host.Info()) } // Process if self, err := sysinfo.Self(); err == nil { process := mapstr.M{} if info, err := self.Info(); err == nil { process["name"] = info.Name process["pid"] = info.PID process["ppid"] = info.PPID process["cwd"] = info.CWD process["exe"] = info.Exe process["start_time"] = info.StartTime } if proc, ok := self.(types.Seccomp); ok { if seccomp, err := proc.Seccomp(); err == nil { process["seccomp"] = seccomp } } if proc, ok := self.(types.Capabilities); ok { if caps, err := proc.Capabilities(); err == nil { process["capabilities"] = caps } } if len(process) > 0 { log.Infow("Process info", "process", process) } } }