internal/pkg/agent/application/monitoring/v1_monitor.go (1,085 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.
package monitoring
import (
"crypto/sha256"
"fmt"
"maps"
"math"
"net"
"net/url"
"os"
"path/filepath"
"runtime"
"slices"
"strconv"
"strings"
"time"
"unicode"
"github.com/elastic/elastic-agent/pkg/component"
"github.com/elastic/elastic-agent/pkg/utils"
koanfmaps "github.com/knadh/koanf/maps"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/info"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
"github.com/elastic/elastic-agent/internal/pkg/agent/errors"
"github.com/elastic/elastic-agent/internal/pkg/config"
monitoringCfg "github.com/elastic/elastic-agent/internal/pkg/core/monitoring/config"
)
const (
// args: data path, pipeline name, application name
logFileFormat = "%s/logs/%s"
// args: data path, install path, pipeline name, application name
logFileFormatWin = "%s\\logs\\%s"
// args: pipeline name, application name
agentMbEndpointFileFormatWin = `npipe:///elastic-agent`
httpPlusPrefix = "http+"
httpPrefix = "http"
fileSchemePrefix = "file"
unixSchemePrefix = "unix"
defaultOutputName = "default"
outputsKey = "outputs"
inputsKey = "inputs"
idKey = "id"
agentKey = "agent"
monitoringKey = "monitoring"
useOutputKey = "use_output"
monitoringMetricsPeriodKey = "metrics_period"
failureThresholdKey = "failure_threshold"
monitoringOutput = "monitoring"
defaultMonitoringNamespace = "default"
agentName = "elastic-agent"
metricBeatName = "metricbeat"
fileBeatName = "filebeat"
monitoringMetricsUnitID = "metrics-monitoring"
monitoringFilesUnitsID = "filestream-monitoring"
windowsOS = "windows"
// metricset execution period used for the monitoring metrics inputs
// we set this to 60s to reduce the load/data volume on the monitoring cluster
defaultMetricsCollectionInterval = 60 * time.Second
// metricset stream failure threshold before the stream is marked as DEGRADED
// to avoid marking the agent degraded for transient errors, we set the default threshold to 5
defaultMetricsStreamFailureThreshold = uint(5)
)
var (
errNoOuputPresent = errors.New("outputs not part of the config")
supportedMetricsComponents = []string{"filebeat", "metricbeat", "apm-server", "auditbeat", "cloudbeat", "fleet-server", "heartbeat", "osquerybeat", "packetbeat", "pf-elastic-collector", "pf-elastic-symbolizer"}
supportedBeatsComponents = []string{"filebeat", "metricbeat", "apm-server", "fleet-server", "auditbeat", "cloudbeat", "heartbeat", "osquerybeat", "packetbeat", "pf-elastic-collector", "pf-elastic-symbolizer"}
)
// BeatsMonitor provides config values for monitoring of agent clients (beats, endpoint, etc)
// by injecting the monitoring config into an existing fleet config
type BeatsMonitor struct {
enabled bool // feature flag disabling whole v1 monitoring story
config *monitoringConfig
operatingSystem string
agentInfo info.Agent
}
// componentInfo is the information necessary to generate monitoring configuration for a component. We don't just use
// the Component struct here because we also want to generate configurations for the monitoring components themselves,
// but without generating the full Component for them.
type componentInfo struct {
ID string
BinaryName string
InputSpec *component.InputRuntimeSpec
Pid uint64
RuntimeManager component.RuntimeManager
}
type monitoringConfig struct {
C *monitoringCfg.MonitoringConfig `config:"agent.monitoring"`
}
// New creates a new BeatsMonitor instance.
func New(enabled bool, operatingSystem string, cfg *monitoringCfg.MonitoringConfig, agentInfo info.Agent) *BeatsMonitor {
return &BeatsMonitor{
enabled: enabled,
config: &monitoringConfig{
C: cfg,
},
operatingSystem: operatingSystem,
agentInfo: agentInfo,
}
}
// Enabled returns true if monitoring is enabled and at least one of logs and metrics should be collected.
func (b *BeatsMonitor) Enabled() bool {
return b.enabled && b.config.C.Enabled && (b.config.C.MonitorLogs || b.config.C.MonitorMetrics)
}
// Reload refreshes monitoring configuration.
func (b *BeatsMonitor) Reload(rawConfig *config.Config) error {
if !b.enabled {
// it's disabled regardless of config
return nil
}
if err := rawConfig.UnpackTo(&b.config); err != nil {
return errors.New(err, "failed to unpack monitoring config during reload")
}
return nil
}
// MonitoringConfig adds monitoring inputs to a configuration based on retrieved list of components to run.
// args:
// policy: the existing config policy
// components: a list of the expected running components
// componentIDToBinary: a map of component IDs to binary names
// componentIDPidMap: a map of component IDs to the PIDs of the running components.
func (b *BeatsMonitor) MonitoringConfig(
policy map[string]interface{},
components []component.Component,
componentIDPidMap map[string]uint64,
) (map[string]interface{}, error) {
if !b.Enabled() {
return nil, nil
}
cfg := make(map[string]interface{})
monitoringOutputName := defaultOutputName
metricsCollectionIntervalString := b.config.C.MetricsPeriod
failureThreshold := b.config.C.FailureThreshold
if agentCfg, found := policy[agentKey]; found {
// The agent section is required for feature flags
cfg[agentKey] = agentCfg
agentCfgMap, ok := agentCfg.(map[string]interface{})
if ok {
if monitoringCfg, found := agentCfgMap[monitoringKey]; found {
monitoringMap, ok := monitoringCfg.(map[string]interface{})
if ok {
if use, found := monitoringMap[useOutputKey]; found {
if useStr, ok := use.(string); ok {
monitoringOutputName = useStr
}
}
if metricsPeriod, found := monitoringMap[monitoringMetricsPeriodKey]; found {
if metricsPeriodStr, ok := metricsPeriod.(string); ok {
metricsCollectionIntervalString = metricsPeriodStr
}
}
if policyFailureThresholdRaw, found := monitoringMap[failureThresholdKey]; found {
switch policyValue := policyFailureThresholdRaw.(type) {
case uint:
failureThreshold = &policyValue
case int:
if policyValue < 0 {
return nil, fmt.Errorf("converting policy failure threshold int to uint, value must be non-negative: %v", policyValue)
}
unsignedValue := uint(policyValue)
failureThreshold = &unsignedValue
case float64:
if policyValue < 0 || policyValue > math.MaxUint {
return nil, fmt.Errorf("converting policy failure threshold float64 to uint, value out of range: %v", policyValue)
}
truncatedUnsignedValue := uint(policyValue)
failureThreshold = &truncatedUnsignedValue
case string:
parsedPolicyValue, err := strconv.ParseUint(policyValue, 10, 64)
if err != nil {
return nil, fmt.Errorf("converting policy failure threshold string to uint: %w", err)
}
if parsedPolicyValue > math.MaxUint {
// this is to catch possible overflow in 32-bit envs, should not happen that often
return nil, fmt.Errorf("converting policy failure threshold from string to uint, value out of range: %v", policyValue)
}
uintPolicyValue := uint(parsedPolicyValue)
failureThreshold = &uintPolicyValue
default:
return nil, fmt.Errorf("unsupported type for policy failure threshold: %T", policyFailureThresholdRaw)
}
}
}
}
}
}
componentInfos := b.getComponentInfos(components, componentIDPidMap)
if err := b.injectMonitoringOutput(policy, cfg, monitoringOutputName); err != nil && !errors.Is(err, errNoOuputPresent) {
return nil, errors.New(err, "failed to inject monitoring output")
} else if errors.Is(err, errNoOuputPresent) {
// nothing to inject, no monitoring output
return nil, nil
}
// initializes inputs collection so injectors don't have to deal with it
b.initInputs(cfg)
if b.config.C.MonitorLogs {
if err := b.injectLogsInput(cfg, componentInfos, monitoringOutput); err != nil {
return nil, errors.New(err, "failed to inject monitoring output")
}
}
if b.config.C.MonitorMetrics {
if err := b.injectMetricsInput(cfg, componentInfos, metricsCollectionIntervalString, failureThreshold); err != nil {
return nil, errors.New(err, "failed to inject monitoring output")
}
}
return cfg, nil
}
// EnrichArgs enriches arguments provided to application, in order to enable
// monitoring
func (b *BeatsMonitor) EnrichArgs(unit, binary string, args []string) []string {
configMap := b.ComponentMonitoringConfig(unit, binary)
flattenedMap, _ := koanfmaps.Flatten(configMap, nil, ".")
appendix := make([]string, 0, 20)
keys := slices.Sorted(maps.Keys(flattenedMap))
for _, key := range keys {
value := flattenedMap[key]
appendix = append(appendix, "-E", fmt.Sprintf("%s=%v", key, value))
}
return append(args, appendix...)
}
// ComponentMonitoringConfig returns config for enabling monitoring in the component application.
// To be able to monitor a process implementing a component, we need to tell it if and how it should expose its telemetry.
// Other than enabling features, we set the unix domain socket name on which the application should start its
// monitoring http server.
func (b *BeatsMonitor) ComponentMonitoringConfig(unitID, binary string) map[string]any {
if !b.enabled {
// even if monitoring is disabled enrich args.
// the only way to skip it is by disabling monitoring by feature flag
return nil
}
// only beats understand these flags
if !isSupportedBeatsBinary(binary) {
return nil
}
configMap := make(map[string]any)
endpoint := utils.SocketURLWithFallback(unitID, paths.TempDir())
if endpoint != "" {
httpConfigMap := map[string]any{
"enabled": true,
"host": endpoint,
}
if b.config.C.Pprof != nil && b.config.C.Pprof.Enabled {
httpConfigMap["pprof"] = map[string]any{
"enabled": true,
}
}
if b.config.C.HTTP != nil && b.config.C.HTTP.Buffer != nil && b.config.C.HTTP.Buffer.Enabled {
httpConfigMap["buffer"] = map[string]any{
"enabled": true,
}
}
configMap["http"] = httpConfigMap
}
if !b.config.C.LogMetrics {
configMap["logging"] = map[string]any{
"metrics": map[string]any{
"enabled": false,
},
}
}
return configMap
}
// Prepare executes steps in order for monitoring to work correctly
func (b *BeatsMonitor) Prepare(unit string) error {
if !b.Enabled() {
return nil
}
drops := make([]string, 0, 2)
if b.config.C.MonitorLogs {
logsDrop := loggingPath(unit, b.operatingSystem)
drops = append(drops, filepath.Dir(logsDrop))
}
if b.config.C.MonitorMetrics {
metricsDrop := monitoringDrop(utils.SocketURLWithFallback(unit, paths.TempDir()))
drops = append(drops, metricsDrop)
}
for _, drop := range drops {
if drop == "" {
continue
}
// skip if already exists
if _, err := os.Stat(drop); err != nil {
if !os.IsNotExist(err) {
return err
}
// create
if err := os.MkdirAll(drop, 0o775); err != nil {
return errors.New(err, fmt.Sprintf("failed to create directory %q", drop))
}
uid, gid := os.Geteuid(), os.Getegid()
if err := changeOwner(drop, uid, gid); err != nil {
return errors.New(err, fmt.Sprintf("failed to change owner of directory %q", drop))
}
}
}
return nil
}
// Cleanup removes files that were created for monitoring.
func (b *BeatsMonitor) Cleanup(unit string) error {
if !b.Enabled() {
return nil
}
endpoint := monitoringFile(unit)
if endpoint == "" {
return nil
}
return os.RemoveAll(endpoint)
}
func (b *BeatsMonitor) initInputs(cfg map[string]interface{}) {
_, found := cfg[inputsKey]
if found {
return
}
inputsCollection := make([]interface{}, 0)
cfg[inputsKey] = inputsCollection
}
func (b *BeatsMonitor) injectMonitoringOutput(source, dest map[string]interface{}, monitoringOutputName string) error {
outputsNode, found := source[outputsKey]
if !found {
return errNoOuputPresent
}
outputs, ok := outputsNode.(map[string]interface{})
if !ok {
return fmt.Errorf("outputs not a map")
}
outputNode, found := outputs[monitoringOutputName]
if !found {
return fmt.Errorf("output %q used for monitoring not found", monitoringOutputName)
}
monitoringOutputs := map[string]interface{}{
monitoringOutput: outputNode,
}
dest[outputsKey] = monitoringOutputs
return nil
}
// getComponentInfos returns a slice of componentInfo structs based on the provided components. This slice contains
// all the information needed to generate the monitoring configuration for these components, as well as configuration
// for new components which are going to be doing the monitoring.
func (b *BeatsMonitor) getComponentInfos(components []component.Component, componentIDPidMap map[string]uint64) []componentInfo {
componentInfos := make([]componentInfo, 0, len(components))
for _, comp := range components {
compInfo := componentInfo{
ID: comp.ID,
BinaryName: comp.BinaryName(),
InputSpec: comp.InputSpec,
RuntimeManager: comp.RuntimeManager,
}
if pid, ok := componentIDPidMap[comp.ID]; ok {
compInfo.Pid = pid
}
componentInfos = append(componentInfos, compInfo)
}
if b.config.C.MonitorMetrics {
componentInfos = append(componentInfos,
componentInfo{
ID: fmt.Sprintf("beat/%s", monitoringMetricsUnitID),
BinaryName: metricBeatName,
RuntimeManager: component.RuntimeManager(b.config.C.RuntimeManager),
},
componentInfo{
ID: fmt.Sprintf("http/%s", monitoringMetricsUnitID),
BinaryName: metricBeatName,
RuntimeManager: component.RuntimeManager(b.config.C.RuntimeManager),
})
}
if b.config.C.MonitorLogs {
componentInfos = append(componentInfos, componentInfo{
ID: monitoringFilesUnitsID,
BinaryName: fileBeatName,
RuntimeManager: component.RuntimeManager(b.config.C.RuntimeManager),
})
}
// sort the components to ensure a consistent order of inputs in the configuration
slices.SortFunc(componentInfos, func(a, b componentInfo) int {
return strings.Compare(a.ID, b.ID)
})
return componentInfos
}
// injectLogsInput adds logging configs for component monitoring to the `cfg` map
func (b *BeatsMonitor) injectLogsInput(cfg map[string]interface{}, componentInfos []componentInfo, monitoringOutput string) error {
logsDrop := filepath.Dir(loggingPath("unit", b.operatingSystem))
streams := []any{b.getAgentFilestreamStream(logsDrop)}
streams = append(streams, b.getServiceComponentFilestreamStreams(componentInfos)...)
input := map[string]interface{}{
idKey: fmt.Sprintf("%s-agent", monitoringFilesUnitsID),
"name": fmt.Sprintf("%s-agent", monitoringFilesUnitsID),
"type": "filestream",
useOutputKey: monitoringOutput,
"streams": streams,
}
// Make sure we don't set anything until the configuration is stable if the otel manager isn't enabled
if b.config.C.RuntimeManager != monitoringCfg.DefaultRuntimeManager {
input["_runtime_experimental"] = b.config.C.RuntimeManager
}
inputs := []any{input}
inputsNode, found := cfg[inputsKey]
if !found {
return fmt.Errorf("no inputs in config")
}
inputsCfg, ok := inputsNode.([]interface{})
if !ok {
return fmt.Errorf("inputs is not an array")
}
inputsCfg = append(inputsCfg, inputs...)
cfg[inputsKey] = inputsCfg
return nil
}
func (b *BeatsMonitor) monitoringNamespace() string {
if ns := b.config.C.Namespace; ns != "" {
return ns
}
return defaultMonitoringNamespace
}
// injectMetricsInput injects monitoring config for agent monitoring to the `cfg` object.
func (b *BeatsMonitor) injectMetricsInput(
cfg map[string]interface{},
componentInfos []componentInfo,
metricsCollectionIntervalString string,
failureThreshold *uint,
) error {
if metricsCollectionIntervalString == "" {
metricsCollectionIntervalString = defaultMetricsCollectionInterval.String()
}
if failureThreshold == nil {
defaultValue := defaultMetricsStreamFailureThreshold
failureThreshold = &defaultValue
}
monitoringNamespace := b.monitoringNamespace()
beatsStreams := b.getBeatsStreams(componentInfos, failureThreshold, metricsCollectionIntervalString)
httpStreams := b.getHttpStreams(componentInfos, failureThreshold, metricsCollectionIntervalString)
inputs := []interface{}{
map[string]interface{}{
idKey: fmt.Sprintf("%s-beats", monitoringMetricsUnitID),
"name": fmt.Sprintf("%s-beats", monitoringMetricsUnitID),
"type": "beat/metrics",
useOutputKey: monitoringOutput,
"data_stream": map[string]interface{}{
"namespace": monitoringNamespace,
},
"streams": beatsStreams,
},
map[string]interface{}{
idKey: fmt.Sprintf("%s-agent", monitoringMetricsUnitID),
"name": fmt.Sprintf("%s-agent", monitoringMetricsUnitID),
"type": "http/metrics",
useOutputKey: monitoringOutput,
"data_stream": map[string]interface{}{
"namespace": monitoringNamespace,
},
"streams": httpStreams,
},
}
// Make sure we don't set anything until the configuration is stable if the otel manager isn't enabled
if b.config.C.RuntimeManager != monitoringCfg.DefaultRuntimeManager {
for _, input := range inputs {
inputMap := input.(map[string]interface{})
inputMap["_runtime_experimental"] = b.config.C.RuntimeManager
}
}
// add system/process metrics for services that can't be monitored via json/beats metrics
inputs = append(inputs, b.getServiceComponentProcessMetricInputs(
componentInfos, metricsCollectionIntervalString)...)
inputsNode, found := cfg[inputsKey]
if !found {
return fmt.Errorf("no inputs in config")
}
inputsCfg, ok := inputsNode.([]interface{})
if !ok {
return fmt.Errorf("inputs is not an array")
}
inputsCfg = append(inputsCfg, inputs...)
cfg[inputsKey] = inputsCfg
return nil
}
// getAgentFilestreamStream returns the filestream stream definition for collecting agent logs.
func (b *BeatsMonitor) getAgentFilestreamStream(logsDrop string) any {
monitoringNamespace := b.monitoringNamespace()
return map[string]any{
idKey: fmt.Sprintf("%s-agent", monitoringFilesUnitsID),
"type": "filestream",
"paths": []interface{}{
filepath.Join(logsDrop, agentName+"-*.ndjson"),
filepath.Join(logsDrop, agentName+"-watcher-*.ndjson"),
},
"data_stream": map[string]interface{}{
"type": "logs",
"dataset": "elastic_agent",
"namespace": monitoringNamespace,
},
"close": map[string]interface{}{
"on_state_change": map[string]interface{}{
"inactive": "5m",
},
},
"parsers": []interface{}{
map[string]interface{}{
"ndjson": map[string]interface{}{
"message_key": "message",
"overwrite_keys": true,
"add_error_key": true,
"target": "",
},
},
},
"processors": processorsForAgentFilestream(),
}
}
// getServiceComponentFilestreamStreams returns filestream stream definitions for collecting logs of components running as
// services.
func (b *BeatsMonitor) getServiceComponentFilestreamStreams(componentInfos []componentInfo) []any {
streams := []any{}
monitoringNamespace := b.monitoringNamespace()
// service components that define a log path are monitored using its own stream in the monitor
for _, compInfo := range componentInfos {
if compInfo.InputSpec == nil || compInfo.InputSpec.Spec.Service == nil || compInfo.InputSpec.Spec.Service.Log == nil || compInfo.InputSpec.Spec.Service.Log.Path == "" {
// only monitor service inputs that define a log path
continue
}
sanitizedBinaryName := sanitizeName(compInfo.BinaryName) // conform with index naming policy
dataset := fmt.Sprintf("elastic_agent.%s", sanitizedBinaryName)
streams = append(streams, map[string]interface{}{
idKey: fmt.Sprintf("%s-%s", monitoringFilesUnitsID, compInfo.ID),
"type": "filestream",
"paths": []interface{}{
compInfo.InputSpec.Spec.Service.Log.Path,
},
"data_stream": map[string]interface{}{
"type": "logs",
"dataset": dataset,
"namespace": monitoringNamespace,
},
"close": map[string]interface{}{
"on_state_change": map[string]interface{}{
"inactive": "5m",
},
},
"parsers": []interface{}{
map[string]interface{}{
"ndjson": map[string]interface{}{
"message_key": "message",
"overwrite_keys": true,
"add_error_key": true,
"target": "",
},
},
},
"processors": processorsForServiceComponentFilestream(compInfo, dataset),
})
}
return streams
}
// getHttpStreams returns stream definitions for http/metrics inputs.
// Note: The return type must be []any due to protobuf serialization quirks.
func (b *BeatsMonitor) getHttpStreams(
componentInfos []componentInfo,
failureThreshold *uint,
metricsCollectionIntervalString string,
) []any {
monitoringNamespace := b.monitoringNamespace()
sanitizedAgentName := sanitizeName(agentName)
indexName := fmt.Sprintf("metrics-elastic_agent.%s-%s", sanitizedAgentName, monitoringNamespace)
dataset := fmt.Sprintf("elastic_agent.%s", sanitizedAgentName)
httpStreams := make([]any, 0, len(componentInfos))
agentStream := map[string]any{
idKey: fmt.Sprintf("%s-agent", monitoringMetricsUnitID),
"data_stream": map[string]interface{}{
"type": "metrics",
"dataset": dataset,
"namespace": monitoringNamespace,
},
"metricsets": []interface{}{"json"},
"path": "/stats",
"hosts": []interface{}{HttpPlusAgentMonitoringEndpoint(b.operatingSystem, b.config.C)},
"namespace": "agent",
"period": metricsCollectionIntervalString,
"index": indexName,
"processors": processorsForAgentHttpStream(monitoringNamespace, dataset, b.agentInfo),
}
if failureThreshold != nil {
agentStream[failureThresholdKey] = *failureThreshold
}
httpStreams = append(httpStreams, agentStream)
for _, compInfo := range componentInfos {
binaryName := compInfo.BinaryName
if !isSupportedMetricsBinary(binaryName) {
continue
}
endpoints := []interface{}{prefixedEndpoint(utils.SocketURLWithFallback(compInfo.ID, paths.TempDir()))}
name := sanitizeName(binaryName)
httpStream := map[string]interface{}{
idKey: fmt.Sprintf("%s-%s-1", monitoringMetricsUnitID, name),
"data_stream": map[string]interface{}{
"type": "metrics",
"dataset": dataset,
"namespace": monitoringNamespace,
},
"metricsets": []interface{}{"json"},
"hosts": endpoints,
"path": "/stats",
"namespace": "agent",
"period": metricsCollectionIntervalString,
"index": indexName,
"processors": processorsForHttpStream(binaryName, compInfo.ID, dataset, b.agentInfo, compInfo.RuntimeManager),
}
if failureThreshold != nil {
httpStream[failureThresholdKey] = *failureThreshold
}
httpStreams = append(httpStreams, httpStream)
// specifically for filebeat, we include input metrics
// disabled for filebeat receiver until https://github.com/elastic/beats/issues/43418 is resolved
if strings.EqualFold(name, "filebeat") && compInfo.RuntimeManager != component.OtelRuntimeManager {
fbDataStreamName := "filebeat_input"
fbDataset := fmt.Sprintf("elastic_agent.%s", fbDataStreamName)
fbIndexName := fmt.Sprintf("metrics-elastic_agent.%s-%s", fbDataStreamName, monitoringNamespace)
fbStream := map[string]any{
idKey: fmt.Sprintf("%s-%s-1", monitoringMetricsUnitID, name),
"data_stream": map[string]interface{}{
"type": "metrics",
"dataset": fbDataset,
"namespace": monitoringNamespace,
},
"metricsets": []interface{}{"json"},
"hosts": endpoints,
"path": "/inputs/",
"namespace": fbDataStreamName,
"json.is_array": true,
"period": metricsCollectionIntervalString,
"index": fbIndexName,
"processors": processorsForHttpStream(binaryName, compInfo.ID, fbDataset, b.agentInfo, compInfo.RuntimeManager),
}
if failureThreshold != nil {
fbStream[failureThresholdKey] = *failureThreshold
}
httpStreams = append(httpStreams, fbStream)
}
}
return httpStreams
}
// getBeatsStreams returns stream definitions for beats inputs.
// Note: The return type must be []any due to protobuf serialization quirks.
func (b *BeatsMonitor) getBeatsStreams(
componentInfos []componentInfo,
failureThreshold *uint,
metricsCollectionIntervalString string,
) []any {
monitoringNamespace := b.monitoringNamespace()
beatsStreams := make([]any, 0, len(componentInfos))
for _, compInfo := range componentInfos {
binaryName := compInfo.BinaryName
if !isSupportedBeatsBinary(binaryName) {
continue
}
endpoints := []interface{}{prefixedEndpoint(utils.SocketURLWithFallback(compInfo.ID, paths.TempDir()))}
name := sanitizeName(binaryName)
dataset := fmt.Sprintf("elastic_agent.%s", name)
indexName := fmt.Sprintf("metrics-elastic_agent.%s-%s", name, monitoringNamespace)
beatsStream := map[string]interface{}{
idKey: fmt.Sprintf("%s-", monitoringMetricsUnitID) + name,
"data_stream": map[string]interface{}{
"type": "metrics",
"dataset": dataset,
"namespace": monitoringNamespace,
},
"metricsets": []interface{}{"stats"},
"hosts": endpoints,
"period": metricsCollectionIntervalString,
"index": indexName,
"processors": processorsForBeatsStream(binaryName, compInfo.ID, monitoringNamespace, dataset, b.agentInfo, compInfo.RuntimeManager),
}
if failureThreshold != nil {
beatsStream[failureThresholdKey] = *failureThreshold
}
beatsStreams = append(beatsStreams, beatsStream)
}
return beatsStreams
}
// getServiceComponentProcessMetricInputs returns input definitions for collecting process metrics of components
// running as services.
// Note: The return type must be []any due to protobuf serialization quirks.
func (b *BeatsMonitor) getServiceComponentProcessMetricInputs(
componentInfos []componentInfo,
metricsCollectionIntervalString string,
) []any {
monitoringNamespace := b.monitoringNamespace()
inputs := []any{}
for _, compInfo := range componentInfos {
if compInfo.InputSpec == nil || compInfo.InputSpec.Spec.Service == nil || compInfo.Pid == 0 {
continue
}
// If there's a checkin PID and the corresponding component has a service spec section, add a system/process config
name := sanitizeName(compInfo.BinaryName)
dataset := fmt.Sprintf("elastic_agent.%s", name)
input := map[string]interface{}{
idKey: fmt.Sprintf("%s-%s", monitoringMetricsUnitID, name),
"name": fmt.Sprintf("%s-%s", monitoringMetricsUnitID, name),
"type": "system/metrics",
useOutputKey: monitoringOutput,
"data_stream": map[string]interface{}{
"namespace": monitoringNamespace,
},
"streams": []interface{}{
map[string]interface{}{
idKey: fmt.Sprintf("%s-%s", monitoringMetricsUnitID, name),
"data_stream": map[string]interface{}{
"type": "metrics",
"dataset": dataset,
"namespace": monitoringNamespace,
},
"metricsets": []interface{}{"process"},
"period": metricsCollectionIntervalString,
"index": fmt.Sprintf("metrics-elastic_agent.%s-%s", name, monitoringNamespace),
"process.pid": compInfo.Pid,
"process.cgroups.enabled": false,
"processors": processorsForProcessMetrics(name, compInfo.ID, monitoringNamespace, dataset, b.agentInfo),
},
},
}
inputs = append(inputs, input)
}
return inputs
}
// processorsForAgentFilestream returns processors used for agent logs in a filestream input.
func processorsForAgentFilestream() []any {
processors := []any{
// drop all events from monitoring components (do it early)
// without dropping these events the filestream gets stuck in an infinite loop
// if filestream hits an issue publishing the events it logs an error which then filestream monitor
// will read from the logs and try to also publish that new log message (thus the infinite loop).
dropEventsFromMonitoringComponentsProcessor(),
// drop periodic metrics logs (those are useful mostly in diagnostic dumps where we collect log files)
dropPeriodicMetricsLogsProcessor(),
}
// if the event is from a component, use the component's dataset
processors = append(processors, useComponentDatasetProcessors()...)
processors = append(processors,
// coming from logger, added by agent (drop)
dropEcsVersionFieldProcessor(),
// adjust destination data_stream based on the data_stream fields
addFormattedIndexProcessor(),
)
return processors
}
// processorsForServiceComponentFilestream returns processors used for filestream streams for components running as
// Services.
func processorsForServiceComponentFilestream(compInfo componentInfo, dataset string) []any {
return []interface{}{
map[string]interface{}{
// component information must be injected because it's not a subprocess
"add_fields": map[string]interface{}{
"target": "component",
"fields": map[string]interface{}{
"id": compInfo.ID,
"type": compInfo.InputSpec.InputType,
"binary": compInfo.BinaryName,
"dataset": dataset,
},
},
},
map[string]interface{}{
// injecting component log source to stay aligned with command runtime logs
"add_fields": map[string]interface{}{
"target": "log",
"fields": map[string]interface{}{
"source": compInfo.ID,
},
},
},
}
}
// processorsForProcessMetrics returns processors used for process metrics.
func processorsForProcessMetrics(binaryName, unitID, namespace, dataset string, agentInfo info.Agent) []any {
return []any{
addDataStreamFieldsProcessor(dataset, namespace),
addEventFieldsProcessor(dataset),
addElasticAgentFieldsProcessor(binaryName, agentInfo),
addAgentFieldsProcessor(agentInfo.AgentID()),
addComponentFieldsProcessor(binaryName, unitID),
}
}
// processorsForBeatsStream returns the processors used for metric streams in the beats input.
func processorsForBeatsStream(
binaryName, unitID, namespace, dataset string,
agentInfo info.Agent,
runtimeManager component.RuntimeManager,
) []any {
processors := []any{
addDataStreamFieldsProcessor(dataset, namespace),
addEventFieldsProcessor(dataset),
addElasticAgentFieldsProcessor(binaryName, agentInfo),
addAgentFieldsProcessor(agentInfo.AgentID()),
addComponentFieldsProcessor(binaryName, unitID),
}
if runtimeManager == component.OtelRuntimeManager { // we don't want process metrics for beats receivers
fieldsToDrop := []any{
"beat.stats.cgroup",
"beat.stats.cpu",
"beat.stats.handles",
"beat.stats.memstats",
"beat.stats.runtime",
}
processors = append(processors, map[string]interface{}{
"drop_fields": map[string]interface{}{
"fields": fieldsToDrop,
"ignore_missing": true,
},
})
}
return processors
}
// processorsForBeatsStream returns the processors used for metric streams in the beats input.
func processorsForHttpStream(binaryName, unitID, dataset string, agentInfo info.Agent, runtimeManager component.RuntimeManager) []any {
sanitizedName := sanitizeName(binaryName)
fieldsToDrop := []any{"http"}
if runtimeManager == component.OtelRuntimeManager { // we don't want process metrics for beats receivers
fieldsToDrop = append(fieldsToDrop, "system")
}
return []interface{}{
addEventFieldsProcessor(dataset),
addElasticAgentFieldsProcessor(sanitizedName, agentInfo),
addAgentFieldsProcessor(agentInfo.AgentID()),
addCopyFieldsProcessor(httpCopyRules(), true, false),
dropFieldsProcessor(fieldsToDrop, true),
addComponentFieldsProcessor(binaryName, unitID),
}
}
// processorsForAgentHttpStream returns the processors used for the agent metric stream in the beats input.
func processorsForAgentHttpStream(namespace, dataset string, agentInfo info.Agent) []any {
return []interface{}{
addDataStreamFieldsProcessor(dataset, namespace),
addEventFieldsProcessor(dataset),
addElasticAgentFieldsProcessor(agentName, agentInfo),
addAgentFieldsProcessor(agentInfo.AgentID()),
addCopyFieldsProcessor(httpCopyRules(), true, false),
dropFieldsProcessor([]any{"http"}, true),
addComponentFieldsProcessor(agentName, agentName),
}
}
// addElasticAgentFieldsProcessor returns a processor definition that adds agent information in an `elastic_agent` field.
func addElasticAgentFieldsProcessor(binaryName string, agentInfo info.Agent) map[string]any {
return map[string]any{
"add_fields": map[string]any{
"target": "elastic_agent",
"fields": map[string]any{
"id": agentInfo.AgentID(),
"version": agentInfo.Version(),
"snapshot": agentInfo.Snapshot(),
"process": binaryName,
},
},
}
}
// addAgentFieldsProcessor returns a processor definition that adds the agent ID under an `agent.id` field.
func addAgentFieldsProcessor(agentID string) map[string]any {
return map[string]interface{}{
"add_fields": map[string]interface{}{
"target": "agent",
"fields": map[string]interface{}{
"id": agentID,
},
},
}
}
// addComponentFieldsProcessor returns a processor definition that adds component information.
func addComponentFieldsProcessor(binaryName, unitID string) map[string]any {
return map[string]interface{}{
"add_fields": map[string]interface{}{
"target": "component",
"fields": map[string]interface{}{
"id": unitID,
"binary": binaryName,
},
},
}
}
// addDataStreamFieldsProcessor returns a processor definition that adds datastream information.
func addDataStreamFieldsProcessor(dataset, namespace string) map[string]any {
return map[string]interface{}{
"add_fields": map[string]interface{}{
"target": "data_stream",
"fields": map[string]interface{}{
"type": "metrics",
"dataset": dataset,
"namespace": namespace,
},
},
}
}
// addEventFieldsProcessor returns a processor definition that adds an `event.dataset` field.
func addEventFieldsProcessor(dataset string) map[string]any {
return map[string]interface{}{
"add_fields": map[string]interface{}{
"target": "event",
"fields": map[string]interface{}{
"dataset": dataset,
},
},
}
}
// addCopyRulesProcessor returns a processor that copies fields according to the provided rules.
func addCopyFieldsProcessor(copyRules []any, ignoreMissing bool, failOnError bool) map[string]any {
return map[string]interface{}{
"copy_fields": map[string]interface{}{
"fields": copyRules,
"ignore_missing": ignoreMissing,
"fail_on_error": failOnError,
},
}
}
// dropFieldsProcessor returns a processor which drops the provided fields.
func dropFieldsProcessor(fields []any, ignoreMissing bool) map[string]any {
return map[string]interface{}{
"drop_fields": map[string]interface{}{
"fields": fields,
"ignore_missing": ignoreMissing,
},
}
}
// dropEventsFromMonitoringComponentsProcessor returns a processor which drops all events from monitoring components.
// We identify a monitoring component by looking at their ID. They all end in `-monitoring`, e.g:
// - "beat/metrics-monitoring"
// - "filestream-monitoring"
// - "http/metrics-monitoring"
func dropEventsFromMonitoringComponentsProcessor() map[string]any {
return map[string]interface{}{
"drop_event": map[string]interface{}{
"when": map[string]interface{}{
"regexp": map[string]interface{}{
"component.id": ".*-monitoring$",
},
},
},
}
}
// dropPeriodicMetricsLogsProcessor returns a processor which drops logs about periodic metrics. This is done by
// matching on the start of the log message.
func dropPeriodicMetricsLogsProcessor() map[string]any {
return map[string]interface{}{
"drop_event": map[string]interface{}{
"when": map[string]interface{}{
"regexp": map[string]interface{}{
"message": "^Non-zero metrics in the last",
},
},
},
}
}
// useComponentDatasetProcessors returns a list of processors which replace data_stream.dataset with component.dataset
// if the latter is set. It also sets event.dataset to the same value. This is used to ensure logs from components
// routed to the elastic-agent logger get sent to the component-specific dataset.
func useComponentDatasetProcessors() []any {
return []any{
// copy original dataset so we can drop the dataset field
map[string]any{
"copy_fields": map[string]any{
"fields": []any{
map[string]any{
"from": "data_stream.dataset",
"to": "data_stream.dataset_original",
},
},
},
},
// drop the dataset field so following copy_field can copy to it
map[string]any{
"drop_fields": map[string]any{
"fields": []any{
"data_stream.dataset",
},
},
},
// copy component.dataset as the real dataset
map[string]any{
"copy_fields": map[string]any{
"fields": []any{
map[string]any{
"from": "component.dataset",
"to": "data_stream.dataset",
},
},
"fail_on_error": false,
"ignore_missing": true,
},
},
// possible it's a log message from agent itself (doesn't have component.dataset)
map[string]any{
"copy_fields": map[string]any{
"when": map[string]any{
"not": map[string]any{
"has_fields": []any{
"data_stream.dataset",
},
},
},
"fields": []any{
map[string]any{
"from": "data_stream.dataset_original",
"to": "data_stream.dataset",
},
},
"fail_on_error": false,
},
},
// drop the original dataset copied and the event.dataset (as it will be updated)
map[string]any{
"drop_fields": map[string]any{
"fields": []any{
"data_stream.dataset_original",
"event.dataset",
},
},
},
// update event.dataset with the now used data_stream.dataset
map[string]any{
"copy_fields": map[string]any{
"fields": []any{
map[string]any{
"from": "data_stream.dataset",
"to": "event.dataset",
},
},
},
},
}
}
// dropEcsVersionFieldProcessor returns a processor which drops the ecs.version field from the event.
func dropEcsVersionFieldProcessor() map[string]any {
return map[string]any{
"drop_fields": map[string]any{
"fields": []any{
"ecs.version",
},
"ignore_missing": true,
},
}
}
// addFormattedIndexProcessor returns a processor which sets the destination index for an event based on a format string.
func addFormattedIndexProcessor() map[string]any {
return map[string]any{
"add_formatted_index": map[string]any{
"index": "%{[data_stream.type]}-%{[data_stream.dataset]}-%{[data_stream.namespace]}",
},
}
}
// sanitizeName sanitizes the input name to make it a valid part of ES index names.
func sanitizeName(name string) string {
return strings.ReplaceAll(strings.ReplaceAll(name, "-", "_"), "/", "_")
}
func loggingPath(id, operatingSystem string) string {
id = strings.ReplaceAll(id, string(filepath.Separator), "-")
if operatingSystem == windowsOS {
return fmt.Sprintf(logFileFormatWin, paths.Home(), id)
}
return fmt.Sprintf(logFileFormat, paths.Home(), id)
}
func prefixedEndpoint(endpoint string) string {
if endpoint == "" || strings.HasPrefix(endpoint, httpPlusPrefix) || strings.HasPrefix(endpoint, httpPrefix) {
return endpoint
}
return httpPlusPrefix + endpoint
}
func monitoringFile(id string) string {
endpoint := utils.SocketURLWithFallback(id, paths.TempDir())
if endpoint == "" {
return ""
}
if isNpipe(endpoint) {
return ""
}
if isWindowsPath(endpoint) {
return endpoint
}
u, _ := url.Parse(endpoint)
if u == nil || (u.Scheme != "" && u.Scheme != fileSchemePrefix && u.Scheme != unixSchemePrefix) {
return ""
}
if u.Scheme == fileSchemePrefix {
return strings.TrimPrefix(endpoint, "file://")
}
if u.Scheme == unixSchemePrefix {
return strings.TrimPrefix(endpoint, "unix://")
}
return endpoint
}
func isNpipe(path string) bool {
return strings.HasPrefix(path, "npipe") || strings.HasPrefix(path, `\\.\pipe\`)
}
func isWindowsPath(path string) bool {
if len(path) < 4 {
return false
}
return unicode.IsLetter(rune(path[0])) && path[1] == ':'
}
func changeOwner(path string, uid, gid int) error {
if runtime.GOOS == windowsOS {
// on windows it always returns the syscall.EWINDOWS error, wrapped in *PathError
return nil
}
return os.Chown(path, uid, gid)
}
// HttpPlusAgentMonitoringEndpoint provides an agent monitoring endpoint path with a `http+` prefix.
func HttpPlusAgentMonitoringEndpoint(operatingSystem string, cfg *monitoringCfg.MonitoringConfig) string {
return prefixedEndpoint(AgentMonitoringEndpoint(operatingSystem, cfg))
}
// AgentMonitoringEndpoint provides an agent monitoring endpoint path.
func AgentMonitoringEndpoint(operatingSystem string, cfg *monitoringCfg.MonitoringConfig) string {
if cfg != nil && cfg.Enabled {
return "http://" + net.JoinHostPort(cfg.HTTP.Host, strconv.Itoa(cfg.HTTP.Port))
}
if operatingSystem == windowsOS {
return agentMbEndpointFileFormatWin
}
// unix socket path must be less than 104 characters
path := fmt.Sprintf("unix://%s.sock", filepath.Join(paths.TempDir(), agentName))
if len(path) < 104 {
return path
}
// place in global /tmp to ensure that its small enough to fit; current path is way to long
// for it to be used, but needs to be unique per Agent (in the case that multiple are running)
return fmt.Sprintf(`unix:///tmp/elastic-agent/%x.sock`, sha256.Sum256([]byte(path)))
}
func httpCopyRules() []interface{} {
fromToMap := []interface{}{
// I should be able to see the CPU Usage on the running machine. Am using too much CPU?
map[string]interface{}{
"from": "http.agent.beat.cpu",
"to": "system.process.cpu",
},
// I should be able to see the Memory usage of Elastic Agent. Is the Elastic Agent using too much memory?
map[string]interface{}{
"from": "http.agent.beat.memstats.memory_sys",
"to": "system.process.memory.size",
},
// I should be able to see fd usage. Am I keep too many files open?
map[string]interface{}{
"from": "http.agent.beat.handles",
"to": "system.process.fd",
},
// Cgroup reporting
map[string]interface{}{
"from": "http.agent.beat.cgroup",
"to": "system.process.cgroup",
},
// apm-server specific
map[string]interface{}{
"from": "http.agent.apm-server",
"to": "apm-server",
},
// I should be able to see the filebeat input metrics
map[string]interface{}{
"from": "http.filebeat_input",
"to": "filebeat_input",
},
}
return fromToMap
}
func isSupportedMetricsBinary(binaryName string) bool {
for _, supportedBinary := range supportedMetricsComponents {
if strings.EqualFold(supportedBinary, binaryName) {
return true
}
}
return false
}
func isSupportedBeatsBinary(binaryName string) bool {
for _, supportedBinary := range supportedBeatsComponents {
if strings.EqualFold(supportedBinary, binaryName) {
return true
}
}
return false
}
func monitoringDrop(path string) (drop string) {
defer func() {
if drop != "" {
// Dir call changes separator to the one used in OS
// '/var/lib' -> '\var\lib\' on windows
baseLen := len(filepath.Dir(drop))
drop = drop[:baseLen]
}
}()
if strings.Contains(path, "localhost") {
return ""
}
path = strings.TrimPrefix(path, httpPlusPrefix)
// npipe is virtual without a drop
if isNpipe(path) {
return ""
}
if isWindowsPath(path) {
return path
}
u, _ := url.Parse(path)
if u == nil || (u.Scheme != "" && u.Scheme != fileSchemePrefix && u.Scheme != unixSchemePrefix) {
return ""
}
if u.Scheme == fileSchemePrefix {
return strings.TrimPrefix(path, "file://")
}
if u.Scheme == unixSchemePrefix {
return strings.TrimPrefix(path, "unix://")
}
return path
}