translator/translate/otel/receiver/adapter/translators.go (195 lines of code) (raw):

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: MIT package adapter import ( "fmt" "log" "strings" "time" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/confmap" "github.com/aws/amazon-cloudwatch-agent/internal/util/collections" translatorconfig "github.com/aws/amazon-cloudwatch-agent/translator/config" "github.com/aws/amazon-cloudwatch-agent/translator/translate/logs/logs_collected/files" "github.com/aws/amazon-cloudwatch-agent/translator/translate/logs/logs_collected/windows_events" "github.com/aws/amazon-cloudwatch-agent/translator/translate/metrics/metrics_collect" collectd "github.com/aws/amazon-cloudwatch-agent/translator/translate/metrics/metrics_collect/collectd" "github.com/aws/amazon-cloudwatch-agent/translator/translate/metrics/metrics_collect/customizedmetrics" "github.com/aws/amazon-cloudwatch-agent/translator/translate/metrics/metrics_collect/gpu" "github.com/aws/amazon-cloudwatch-agent/translator/translate/metrics/metrics_collect/procstat" "github.com/aws/amazon-cloudwatch-agent/translator/translate/metrics/metrics_collect/statsd" "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/common" ) const ( defaultMetricsCollectionInterval = time.Minute ebsPrefix = "ebs_" ) var ( logKey = common.ConfigKey(common.LogsKey, common.LogsCollectedKey) metricKey = common.ConfigKey(common.MetricsKey, common.MetricsCollectedKey) skipInputSet = collections.NewSet[string](files.SectionKey, windows_events.SectionKey) multipleInputSet = collections.NewSet[string](procstat.SectionKey) // Order by PidFile, ExeKey, Pattern Key according to the public documents // if multiple configuration is specified // https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-Agent-procstat-process-metrics.html#CloudWatch-Agent-procstat-configuration procstatMonitoredSet = []string{ procstat.PidFileKey, procstat.ExeKey, procstat.PatternKey, } // windowsInputSet contains all the supported metric input plugins. All others are considered custom metrics. // An exception would be procstat metrics windowsInputSet = collections.NewSet[string]( gpu.SectionKey, statsd.SectionKey, ) // skipWindowsInputSet contains all the supported metric input plugins that should not be included in telegraf windows plugins skipWindowsInputSet = collections.NewSet[string]( common.JmxKey, ) // aliasMap contains mappings for all input plugins that use another // name in Telegraf. aliasMap = map[string]string{ collectd.SectionKey: collectd.SectionMappedKey, files.SectionKey: files.SectionMappedKey, gpu.SectionKey: gpu.SectionMappedKey, windows_events.SectionKey: windows_events.SectionMappedKey, } // defaultCollectionIntervalMap contains all input plugins that have a // different default interval. defaultCollectionIntervalMap = map[string]time.Duration{ statsd.SectionKey: 10 * time.Second, } // otelReceivers is used for receivers that need to be in the same pipeline that // exports to Cloudwatch while not having to follow the adapter rules otelReceivers = collections.NewSet[string](common.OtlpKey, common.JmxKey, common.PrometheusKey) ) // FindReceiversInConfig looks in the metrics and logs sections to determine which // plugins require adapter translators. Logs is processed first, so any // colliding metrics translators will override them. This follows the rule // setup. func FindReceiversInConfig(conf *confmap.Conf, os string) (common.TranslatorMap[component.Config, component.ID], error) { translators := common.NewTranslatorMap[component.Config, component.ID]() translators.Merge(fromLogs(conf)) metricTranslators, err := fromMetrics(conf, os) translators.Merge(metricTranslators) return translators, err } // fromMetrics creates adapter receiver translators based on the os-specific // metrics section in the config. func fromMetrics(conf *confmap.Conf, os string) (common.TranslatorMap[component.Config, component.ID], error) { translators := common.NewTranslatorMap[component.Config, component.ID]() switch os { case translatorconfig.OS_TYPE_LINUX, translatorconfig.OS_TYPE_DARWIN: translators.Merge(fromLinuxMetrics(conf)) case translatorconfig.OS_TYPE_WINDOWS: translators.Merge(fromWindowsMetrics(conf)) default: return nil, fmt.Errorf("unsupported OS: %v", os) } return translators, nil } // fromLinuxMetrics creates a translator for each subsection within the // metrics::metrics_collected section of the config. Can be anything. func fromLinuxMetrics(conf *confmap.Conf) common.TranslatorMap[component.Config, component.ID] { var validInputs map[string]bool if _, ok := conf.Get(common.ConfigKey(metricKey)).(map[string]interface{}); ok { rule := &metrics_collect.CollectMetrics{} rule.ApplyRule(conf.Get(common.ConfigKey(common.MetricsKey))) validInputs = rule.GetRegisteredMetrics() } return fromInputs(conf, validInputs, metricKey) } // fromWindowsMetrics creates a translator for each allow listed subsection // within the metrics::metrics_collected section. See windowsInputSet for // allow list. If non-allow-listed subsections exist, they will be grouped // under a windows performance counter adapter translator. func fromWindowsMetrics(conf *confmap.Conf) common.TranslatorMap[component.Config, component.ID] { translators := common.NewTranslatorMap[component.Config, component.ID]() if inputs, ok := conf.Get(metricKey).(map[string]interface{}); ok { for inputName := range inputs { if otelReceivers.Contains(inputName) { continue } if windowsInputSet.Contains(inputName) { cfgKey := common.ConfigKey(metricKey, inputName) translators.Set(NewTranslator(toAlias(inputName), cfgKey, collections.GetOrDefault( defaultCollectionIntervalMap, inputName, defaultMetricsCollectionInterval, ))) } else { translators.Merge(fromMultipleInput(conf, inputName, translatorconfig.OS_TYPE_WINDOWS)) } } } return translators } // fromLogs creates a translator for each subsection within logs::logs_collected // along with a socket listener translator if "emf" or "structuredlog" are present // within the logs:metrics_collected section. func fromLogs(conf *confmap.Conf) common.TranslatorMap[component.Config, component.ID] { return fromInputs(conf, nil, logKey) } // fromInputs converts all the keys in the section into adapter translators. func fromInputs(conf *confmap.Conf, validInputs map[string]bool, baseKey string) common.TranslatorMap[component.Config, component.ID] { translators := common.NewTranslatorMap[component.Config, component.ID]() if inputs, ok := conf.Get(baseKey).(map[string]interface{}); ok { for inputName := range inputs { if skipInputSet.Contains(inputName) { // logs agent is separate from otel agent continue } if validInputs != nil { if otelReceivers.Contains(inputName) { continue } else if _, ok := validInputs[inputName]; !ok { log.Printf("W! Ignoring unrecognized input %s", inputName) continue } } cfgKey := common.ConfigKey(baseKey, inputName) hasMeasurements := true noTelegrafReceivers := false if conf.IsSet(common.ConfigKey(cfgKey, common.MeasurementKey)) { inputConf := conf.Get(cfgKey) if inputConf != nil { measurement := common.GetMeasurements(inputConf.(map[string]any)) hasMeasurements = len(measurement) != 0 noTelegrafReceivers = containsOnlyNonAdaptedMetrics(inputName, measurement) } } if !hasMeasurements { log.Printf("W! Agent will not emit any metrics for %s due to empty measurement field ", inputName) continue } else if noTelegrafReceivers { // Skip adding the adapted translator because the metric is not being collected through the adapted receiver // Example is EBS NVMe metrics which has its own receiver, whereas the other diskio metrics are collected // using Telegraf (adapted receiver). continue } else if multipleInputSet.Contains(inputName) { translators.Merge(fromMultipleInput(conf, inputName, "")) } else { translators.Set(NewTranslator(toAlias(inputName), cfgKey, collections.GetOrDefault( defaultCollectionIntervalMap, inputName, defaultMetricsCollectionInterval, ))) } } } return translators } // fromMultipleInput generates multiple receivers with unique ID depends on the number of inputs. // Since there plugins from Telegraf that allows multiple inputs such as procstat, window_perf_counter; // therefore, generate a hash of the monitored process (e.g exe: hash(amazon-cloudwatch-agent)) // to provide a unique identifier for the receivers and easy in compare with the alias // https://github.com/influxdata/telegraf/blob/d8db3ca3a293bc24a9120b590984b09e2de1851a/models/running_input.go#L60 // and generate the appropriate running input when starting adapter func fromMultipleInput(conf *confmap.Conf, inputName, os string) common.TranslatorMap[component.Config, component.ID] { translators := common.NewTranslatorMap[component.Config, component.ID]() cfgKey := common.ConfigKey(metricKey, inputName) if inputName == procstat.SectionKey { /* For procstat metrics, telegraf allows and generates more than 2 inputs. [[inputs.procstat]] pattern = "ssm-agent" interval = "1s" fieldpass = ["memory_stack"] pid_finder = "native" [[inputs.procstat]] exe = "amazon-cloudwatch-agent" interval = "1s" fieldpass = ["cpu_time_system"] pid_finder = "native" */ for _, procStatKey := range common.GetArray[any](conf, cfgKey) { // Each of the procstat monitored process has their own process; therefore, overriding the interval key chain // and setting dirrectly psKey := procStatKey.(map[string]interface{}) psCollectionInterval, _ := common.ParseDuration(psKey[common.MetricsCollectionIntervalKey]) // Array type validation needs to be specific https://stackoverflow.com/a/47989212 for _, procstatMonitored := range procstatMonitoredSet { if componentPsValue, ok := psKey[procstatMonitored]; ok { translators.Set(NewTranslatorWithName( componentPsValue.(string), procstat.SectionKey, cfgKey, psCollectionInterval, defaultMetricsCollectionInterval)) break } } } } else if os == translatorconfig.OS_TYPE_WINDOWS && !windowsInputSet.Contains(inputName) && !skipWindowsInputSet.Contains(inputName) { /* For customized metrics from Windows and window performance counters metrics [[inputs.win_perf_counters.object]] ObjectName = "Processor" Instances = ["*"] Counters = ["% Idle Time", "% Interrupt Time", "% Privileged Time", "% User Time", "% Processor Time"] Measurement = "win_cpu" [[inputs.win_perf_counters.object]] ObjectName = "LogicalDisk" Instances = ["*"] Counters = ["% Idle Time", "% Disk Time","% Disk Read Time", "% Disk Write Time", "% User Time", "Current Disk Queue Length"] Measurement = "win_disk" */ translators.Set(NewTranslatorWithName( inputName, customizedmetrics.WinPerfCountersKey, cfgKey, time.Duration(0), defaultMetricsCollectionInterval, )) } return translators } // toAlias gets the alias for the input name if it has one. func toAlias(inputName string) string { return collections.GetOrDefault(aliasMap, inputName, inputName) } // containsOnlyNonAdaptedMetrics is used for when a section may contain metrics that are not // emitted through the adapted receiver. This function is used to check if only non-adaptable // metrics are configured for. func containsOnlyNonAdaptedMetrics(inputName string, measurements []string) bool { for _, m := range measurements { switch inputName { case common.DiskIOKey: trimmed := strings.TrimPrefix(m, common.DiskIOKey+"_") if !strings.HasPrefix(trimmed, ebsPrefix) { return false } default: return false } } return true }