translator/translate/otel/common/common.go (397 lines of code) (raw):
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT
package common
import (
"container/list"
"fmt"
"os"
"reflect"
"strconv"
"strings"
"time"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/collector/pipeline"
"gopkg.in/yaml.v3"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/logs/util"
)
const (
AgentKey = "agent"
DebugKey = "debug"
MetricsKey = "metrics"
LogsKey = "logs"
TracesKey = "traces"
MetricsCollectedKey = "metrics_collected"
LogsCollectedKey = "logs_collected"
TracesCollectedKey = "traces_collected"
MetricsDestinationsKey = "metrics_destinations"
ECSKey = "ecs"
KubernetesKey = "kubernetes"
CloudWatchKey = "cloudwatch"
CloudWatchLogsKey = "cloudwatchlogs"
PrometheusKey = "prometheus"
PrometheusConfigPathKey = "prometheus_config_path"
AMPKey = "amp"
WorkspaceIDKey = "workspace_id"
EMFProcessorKey = "emf_processor"
DisableMetricExtraction = "disable_metric_extraction"
XrayKey = "xray"
OtlpKey = "otlp"
JmxKey = "jmx"
TLSKey = "tls"
Endpoint = "endpoint"
EndpointOverrideKey = "endpoint_override"
RegionOverrideKey = "region_override"
ProxyOverrideKey = "proxy_override"
InsecureKey = "insecure"
LocalModeKey = "local_mode"
CredentialsKey = "credentials"
RoleARNKey = "role_arn"
SigV4Auth = "sigv4auth"
MetricsCollectionIntervalKey = "metrics_collection_interval"
AggregationDimensionsKey = "aggregation_dimensions"
MeasurementKey = "measurement"
DropOriginalMetricsKey = "drop_original_metrics"
ForceFlushIntervalKey = "force_flush_interval"
ContainerInsightsMetricGranularity = "metric_granularity" // replaced with enhanced_container_insights
EnhancedContainerInsights = "enhanced_container_insights"
ResourcesKey = "resources"
PreferFullPodName = "prefer_full_pod_name"
EnableAcceleratedComputeMetric = "accelerated_compute_metrics"
EnableKueueContainerInsights = "kueue_container_insights"
AppendDimensionsKey = "append_dimensions"
Console = "console"
DiskKey = "disk"
DiskIOKey = "diskio"
NetKey = "net"
Emf = "emf"
StructuredLog = "structuredlog"
ServiceAddress = "service_address"
Udp = "udp"
Tcp = "tcp"
TlsKey = "tls"
Tags = "tags"
Region = "region"
LogGroupName = "log_group_name"
LogStreamName = "log_stream_name"
NameKey = "name"
RenameKey = "rename"
UnitKey = "unit"
)
const (
CollectDMetricKey = "collectd"
CollectDPluginKey = "socket_listener"
CPUMetricKey = "cpu"
DiskMetricKey = "disk"
DiskIoMetricKey = "diskio"
StatsDMetricKey = "statsd"
SwapMetricKey = "swap"
MemMetricKey = "mem"
NetMetricKey = "net"
NetStatMetricKey = "netstat"
ProcessMetricKey = "process"
ProcStatMetricKey = "procstat"
//Windows Plugins
MemMetricKeyWindows = "Memory"
LogicalDiskMetricKeyWindows = "LogicalDisk"
NetworkMetricKeyWindows = "Network Interface"
PagingMetricKeyWindows = "Paging"
PhysicalDiskMetricKeyWindows = "PhysicalDisk"
ProcessorMetricKeyWindows = "Processor"
SystemMetricKeyWindows = "System"
TCPv4MetricKeyWindows = "TCPv4"
TCPv6MetricKeyWindows = "TCPv6"
)
const (
PipelineNameHost = "host"
PipelineNameHostCustomMetrics = "hostCustomMetrics"
PipelineNameHostDeltaMetrics = "hostDeltaMetrics"
PipelineNameHostOtlpMetrics = "hostOtlpMetrics"
PipelineNameContainerInsights = "containerinsights"
PipelineNameJmx = "jmx"
PipelineNameContainerInsightsJmx = "containerinsightsjmx"
PipelineNameEmfLogs = "emf_logs"
PipelineNamePrometheus = "prometheus"
PipelineNameKueue = "kueueContainerInsights"
AppSignals = "application_signals"
AppSignalsFallback = "app_signals"
AppSignalsRules = "rules"
)
var (
AppSignalsTraces = ConfigKey(TracesKey, TracesCollectedKey, AppSignals)
AppSignalsMetrics = ConfigKey(LogsKey, MetricsCollectedKey, AppSignals)
AppSignalsTracesFallback = ConfigKey(TracesKey, TracesCollectedKey, AppSignalsFallback)
AppSignalsMetricsFallback = ConfigKey(LogsKey, MetricsCollectedKey, AppSignalsFallback)
AppSignalsConfigKeys = map[pipeline.Signal][]string{
pipeline.SignalTraces: {AppSignalsTraces, AppSignalsTracesFallback},
pipeline.SignalMetrics: {AppSignalsMetrics, AppSignalsMetricsFallback},
}
JmxConfigKey = ConfigKey(MetricsKey, MetricsCollectedKey, JmxKey)
ContainerInsightsConfigKey = ConfigKey(LogsKey, MetricsCollectedKey, KubernetesKey)
JmxTargets = []string{"activemq", "cassandra", "hbase", "hadoop", "jetty", "jvm", "kafka", "kafka-consumer", "kafka-producer", "solr", "tomcat", "wildfly"}
AgentDebugConfigKey = ConfigKey(AgentKey, DebugKey)
MetricsAggregationDimensionsKey = ConfigKey(MetricsKey, AggregationDimensionsKey)
OTLPLogsKey = ConfigKey(LogsKey, MetricsCollectedKey, OtlpKey)
OTLPMetricsKey = ConfigKey(MetricsKey, MetricsCollectedKey, OtlpKey)
)
type TranslatorID interface {
component.ID | pipeline.ID
Name() string
}
// Translator is used to translate the JSON config into an
// OTEL config.
type Translator[C any, ID TranslatorID] interface {
Translate(*confmap.Conf) (C, error)
ID() ID
}
// TranslatorMap is a set of translators by their types.
type TranslatorMap[C any, ID TranslatorID] interface {
// Set a translator to the map. If the ID is already present, replaces the translator.
// Otherwise, adds it to the end of the list.
Set(Translator[C, ID])
// Get the translator for the component.ID.
Get(ID) (Translator[C, ID], bool)
// Merge another translator map in.
Merge(TranslatorMap[C, ID])
// Keys is the ordered component.IDs.
Keys() []ID
// Range iterates over each translator in order and calls the callback function on each.
Range(func(Translator[C, ID]))
// Len is the number of translators in the map.
Len() int
}
type translatorMap[C any, ID TranslatorID] struct {
// list stores the ordered translators.
list *list.List
// lookup stores the list.Elements containing the translators by ID.
lookup map[ID]*list.Element
}
func (t translatorMap[C, ID]) Set(translator Translator[C, ID]) {
if element, ok := t.lookup[translator.ID()]; ok {
element.Value = translator
} else {
element = t.list.PushBack(translator)
t.lookup[translator.ID()] = element
}
}
func (t translatorMap[C, ID]) Get(id ID) (Translator[C, ID], bool) {
element, ok := t.lookup[id]
if !ok {
return nil, ok
}
return element.Value.(Translator[C, ID]), ok
}
func (t translatorMap[C, ID]) Merge(other TranslatorMap[C, ID]) {
if other != nil {
other.Range(t.Set)
}
}
func (t translatorMap[C, ID]) Keys() []ID {
keys := make([]ID, 0, t.Len())
t.Range(func(translator Translator[C, ID]) {
keys = append(keys, translator.ID())
})
return keys
}
func (t translatorMap[C, ID]) Range(callback func(translator Translator[C, ID])) {
for element := t.list.Front(); element != nil; element = element.Next() {
callback(element.Value.(Translator[C, ID]))
}
}
func (t translatorMap[C, ID]) Len() int {
return t.list.Len()
}
// NewTranslatorMap creates a TranslatorMap from the translators.
func NewTranslatorMap[C any, ID TranslatorID](translators ...Translator[C, ID]) TranslatorMap[C, ID] {
t := translatorMap[C, ID]{
list: list.New(),
lookup: make(map[ID]*list.Element, len(translators)),
}
for _, translator := range translators {
t.Set(translator)
}
return t
}
type ID interface {
String() string
}
// A MissingKeyError occurs when a translator is used for a JSON
// config that does not have a required key. This typically means
// that the pipeline was configured incorrectly.
type MissingKeyError struct {
ID ID
JsonKey string
}
func (e *MissingKeyError) Error() string {
return fmt.Sprintf("%q missing key in JSON: %q", e.ID, e.JsonKey)
}
// ComponentTranslator is a Translator that converts a JSON config into a component
type ComponentTranslator = Translator[component.Config, component.ID]
// ComponentTranslatorMap is a map-like container which stores ComponentTranslators
type ComponentTranslatorMap = TranslatorMap[component.Config, component.ID]
// ComponentTranslators is a component ID and respective service pipeline.
type ComponentTranslators struct {
Receivers ComponentTranslatorMap
Processors ComponentTranslatorMap
Exporters ComponentTranslatorMap
Extensions ComponentTranslatorMap
}
// PipelineTranslator is a Translator that converts a JSON config into a pipeline
type PipelineTranslator = Translator[*ComponentTranslators, pipeline.ID]
// PipelineTranslatorMap is a map-like container which stores PipelineTranslators
type PipelineTranslatorMap = TranslatorMap[*ComponentTranslators, pipeline.ID]
// ConfigKey joins the keys separated by confmap.KeyDelimiter.
// This helps translators navigate the confmap.Conf that the
// JSON config is loaded into.
func ConfigKey(keys ...string) string {
return strings.Join(keys, confmap.KeyDelimiter)
}
// ParseDuration attempts to parse the input into a duration.
// Returns a zero duration and an error if invalid.
func ParseDuration(v interface{}) (time.Duration, error) {
if v != nil {
if fv, ok := v.(float64); ok {
return time.Second * time.Duration(fv), nil
}
s, ok := v.(string)
if !ok {
s = fmt.Sprintf("%v", v)
}
duration, err := time.ParseDuration(s)
if err == nil {
return duration, nil
}
sI, err := strconv.ParseInt(s, 10, 64)
if err == nil {
return time.Second * time.Duration(sI), nil
}
sF, err := strconv.ParseFloat(s, 64)
if err == nil {
return time.Second * time.Duration(sF), nil
}
}
return time.Duration(0), fmt.Errorf("invalid type %v", reflect.TypeOf(v))
}
// GetString gets the string value for the key. If the key is missing,
// ok will be false.
func GetString(conf *confmap.Conf, key string) (string, bool) {
if value := conf.Get(key); value != nil {
got, ok := value.(string)
// if the value isn't a string, convert it
if !ok {
got = fmt.Sprintf("%v", value)
ok = true
}
return got, ok
}
return "", false
}
// GetArray gets the array value for the key. If the key is missing,
// the return value will be nil
func GetArray[C any](conf *confmap.Conf, key string) []C {
if value := conf.Get(key); value != nil {
var arr []C
got, _ := value.([]any)
for _, entry := range got {
if t, ok := entry.(C); ok {
arr = append(arr, t)
}
}
return arr
}
return nil
}
// GetBool gets the bool value for the key. If the key is missing or the
// value is not a bool type, then ok will be false.
func GetBool(conf *confmap.Conf, key string) (value bool, ok bool) {
if v := conf.Get(key); v != nil {
value, ok = v.(bool)
}
return
}
// GetOrDefaultBool gets the bool value for the key. If the key is missing or the
// value is not a bool type, then the defaultVal is returned.
func GetOrDefaultBool(conf *confmap.Conf, key string, defaultVal bool) bool {
if v := conf.Get(key); v != nil {
if val, ok := v.(bool); ok {
return val
}
}
return defaultVal
}
// GetNumber gets the number value for the key. The switch works through
// all reasonable number types (the default is typically float64)
func GetNumber(conf *confmap.Conf, key string) (float64, bool) {
if v := conf.Get(key); v != nil {
switch i := v.(type) {
case float64:
return i, true
case float32:
return float64(i), true
case int64:
return float64(i), true
case int32:
return float64(i), true
case int:
return float64(i), true
case uint64:
return float64(i), true
case uint32:
return float64(i), true
case uint:
return float64(i), true
case string:
}
}
return 0, false
}
// GetOrDefaultNumber gets the number value for the key. If the key is missing or the
// value is not a number type, then the defaultVal is returned.
func GetOrDefaultNumber(conf *confmap.Conf, key string, defaultVal float64) float64 {
value, ok := GetNumber(conf, key)
if !ok {
return defaultVal
}
return value
}
// GetDuration gets the value for the key and calls ParseDuration on it.
// If the key is missing, it is unable to parse the duration, or the
// duration is set to 0, then the returned bool will be false.
func GetDuration(conf *confmap.Conf, key string) (time.Duration, bool) {
var duration time.Duration
var ok bool
if value := conf.Get(key); value != nil {
var err error
duration, err = ParseDuration(value)
ok = err == nil && duration > 0
}
return duration, ok
}
// GetOrDefaultDuration from the first section in the keychain with a
// parsable duration. If none are found, returns the defaultDuration.
func GetOrDefaultDuration(conf *confmap.Conf, keychain []string, defaultDuration time.Duration) time.Duration {
for _, key := range keychain {
duration, ok := GetDuration(conf, key)
if !ok {
continue
}
return duration
}
return defaultDuration
}
func GetYamlFileToYamlConfig(cfg interface{}, yamlFile string) (interface{}, error) {
var cfgMap map[string]interface{}
if err := yaml.Unmarshal([]byte(yamlFile), &cfgMap); err != nil {
return nil, fmt.Errorf("unable to read default config: %w", err)
}
conf := confmap.NewFromStringMap(cfgMap)
if err := conf.Unmarshal(&cfg); err != nil {
return nil, fmt.Errorf("unable to unmarshal config: %w", err)
}
return cfg, nil
}
// GetIndexedMap gets the sub map based on the config key and index. If the config value is an array, then the value
// at the index is returned. If it is a map, then the index is ignored and the map is returned directly.
func GetIndexedMap(conf *confmap.Conf, configKey string, index int) map[string]any {
var got map[string]any
switch v := conf.Get(configKey).(type) {
case []any:
if index != -1 && len(v) > index {
got = v[index].(map[string]any)
}
case map[string]any:
got = v
}
return got
}
// GetMeasurements gets the string values in the measurements section of the provided map. If there are metric
// decoration elements, includes the value associated with the "name" key.
func GetMeasurements(m map[string]any) []string {
var results []string
if measurements, ok := m[MeasurementKey].([]any); ok {
for _, measurement := range measurements {
switch v := measurement.(type) {
case string:
results = append(results, v)
case map[string]any:
if n, ok := v[NameKey]; ok {
if s, ok := n.(string); ok {
results = append(results, s)
}
}
}
}
}
return results
}
// IsAnySet checks if any of the provided keys are present in the configuration.
func IsAnySet(conf *confmap.Conf, keys []string) bool {
for _, key := range keys {
if conf.IsSet(key) {
return true
}
}
return false
}
func KueueContainerInsightsEnabled(conf *confmap.Conf) bool {
return GetOrDefaultBool(conf, ConfigKey(LogsKey, MetricsCollectedKey, KubernetesKey, EnableKueueContainerInsights), false)
}
func GetClusterName(conf *confmap.Conf) string {
val, ok := GetString(conf, ConfigKey(LogsKey, MetricsCollectedKey, KubernetesKey, "cluster_name"))
if ok && val != "" {
return val
}
envVarClusterName := os.Getenv("K8S_CLUSTER_NAME")
if envVarClusterName != "" {
return envVarClusterName
}
return util.GetClusterNameFromEc2Tagger()
}