translator/translate/otel/exporter/awsemf/translator.go (202 lines of code) (raw):

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: MIT package awsemf import ( _ "embed" "fmt" "os" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awsemfexporter" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/confmap" "go.opentelemetry.io/collector/exporter" "gopkg.in/yaml.v3" "github.com/aws/amazon-cloudwatch-agent/cfg/envconfig" "github.com/aws/amazon-cloudwatch-agent/internal/retryer" "github.com/aws/amazon-cloudwatch-agent/translator/config" "github.com/aws/amazon-cloudwatch-agent/translator/context" "github.com/aws/amazon-cloudwatch-agent/translator/translate/agent" "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/common" "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/extension/agenthealth" "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/receiver/awscontainerinsight" ) //go:embed awsemf_default_generic.yaml var defaultGenericConfig string //go:embed awsemf_default_ecs.yaml var defaultEcsConfig string //go:embed awsemf_default_kubernetes.yaml var defaultKubernetesConfig string //go:embed awsemf_default_kubernetes_kueue.yaml var defaultKubernetesKueueConfig string //go:embed awsemf_default_prometheus.yaml var defaultPrometheusConfig string //go:embed awsemf_default_appsignals.yaml var appSignalsConfigGeneric string //go:embed awsemf_jmx_config.yaml var defaultJmxConfig string var ( ecsBasePathKey = common.ConfigKey(common.LogsKey, common.MetricsCollectedKey, common.ECSKey) kubernetesBasePathKey = common.ConfigKey(common.LogsKey, common.MetricsCollectedKey, common.KubernetesKey) kubernetesKueueBasePathKey = common.ConfigKey(common.LogsKey, common.MetricsCollectedKey, common.KubernetesKey, common.EnableKueueContainerInsights) prometheusBasePathKey = common.ConfigKey(common.LogsKey, common.MetricsCollectedKey, common.PrometheusKey) emfProcessorBasePathKey = common.ConfigKey(prometheusBasePathKey, common.EMFProcessorKey) endpointOverrideKey = common.ConfigKey(common.LogsKey, common.EndpointOverrideKey) roleARNPathKey = common.ConfigKey(common.LogsKey, common.CredentialsKey, common.RoleARNKey) ) type translator struct { name string factory exporter.Factory } var _ common.ComponentTranslator = (*translator)(nil) func NewTranslator() common.ComponentTranslator { return NewTranslatorWithName("") } func NewTranslatorWithName(name string) common.ComponentTranslator { return &translator{name, awsemfexporter.NewFactory()} } func (t *translator) ID() component.ID { return component.NewIDWithName(t.factory.Type(), t.name) } // Translate creates an awsemf exporter config based on the input json config func (t *translator) Translate(c *confmap.Conf) (component.Config, error) { cfg := t.factory.CreateDefaultConfig().(*awsemfexporter.Config) cfg.MiddlewareID = &agenthealth.LogsID defaultConfig := defaultGenericConfig if t.isAppSignals(c) { defaultConfig = appSignalsConfigGeneric } else if t.isCiJMX(c) { defaultConfig = defaultJmxConfig } else if isEcs(c) { defaultConfig = defaultEcsConfig } else if isKubernetesKueue(c, t.name) { defaultConfig = defaultKubernetesKueueConfig } else if isKubernetes(c) { defaultConfig = defaultKubernetesConfig } else if isPrometheus(c) { defaultConfig = defaultPrometheusConfig } if isOTLP(c) { cfg.AddEntity = true } if defaultConfig != "" { var rawConf map[string]interface{} if err := yaml.Unmarshal([]byte(defaultConfig), &rawConf); err != nil { return nil, fmt.Errorf("unable to read default config: %w", err) } conf := confmap.NewFromStringMap(rawConf) if err := conf.Unmarshal(&cfg); err != nil { return nil, fmt.Errorf("unable to unmarshal config: %w", err) } } cfg.AWSSessionSettings.CertificateFilePath = os.Getenv(envconfig.AWS_CA_BUNDLE) if c.IsSet(endpointOverrideKey) { cfg.AWSSessionSettings.Endpoint, _ = common.GetString(c, endpointOverrideKey) } cfg.AWSSessionSettings.IMDSRetries = retryer.GetDefaultRetryNumber() if profileKey, ok := agent.Global_Config.Credentials[agent.Profile_Key]; ok { cfg.AWSSessionSettings.Profile = fmt.Sprintf("%v", profileKey) } cfg.AWSSessionSettings.Region = agent.Global_Config.Region cfg.AWSSessionSettings.RoleARN = agent.Global_Config.Role_arn if c.IsSet(roleARNPathKey) { cfg.AWSSessionSettings.RoleARN, _ = common.GetString(c, roleARNPathKey) } if credentialsFileKey, ok := agent.Global_Config.Credentials[agent.CredentialsFile_Key]; ok { cfg.AWSSessionSettings.SharedCredentialsFile = []string{fmt.Sprintf("%v", credentialsFileKey)} } if context.CurrentContext().Mode() == config.ModeOnPrem || context.CurrentContext().Mode() == config.ModeOnPremise { cfg.AWSSessionSettings.LocalMode = true } if t.isAppSignals(c) { if err := setAppSignalsFields(c, cfg); err != nil { return nil, err } } else if t.isCiJMX(c) { if err := setCiJmxFields(); err != nil { return nil, err } } else if isEcs(c) { if err := setEcsFields(c, cfg); err != nil { return nil, err } } else if isKubernetesKueue(c, t.name) { if err := setKubernetesKueueFields(c, cfg); err != nil { return nil, err } } else if isKubernetes(c) { if err := setKubernetesFields(c, cfg); err != nil { return nil, err } } else if isPrometheus(c) { if err := setPrometheusFields(c, cfg); err != nil { return nil, err } } return cfg, nil } func (t *translator) isAppSignals(conf *confmap.Conf) bool { return (t.name == common.AppSignals || t.name == common.AppSignalsFallback) && (conf.IsSet(common.AppSignalsMetrics) || conf.IsSet(common.AppSignalsTraces) || conf.IsSet(common.AppSignalsMetricsFallback) || conf.IsSet(common.AppSignalsTracesFallback)) } func (t *translator) isCiJMX(conf *confmap.Conf) bool { return (t.name == common.PipelineNameContainerInsightsJmx) && (conf.IsSet(common.ContainerInsightsConfigKey)) } func isEcs(conf *confmap.Conf) bool { return conf.IsSet(ecsBasePathKey) } func isKubernetes(conf *confmap.Conf) bool { return conf.IsSet(kubernetesBasePathKey) } // `kueue_container_insights` is a child of `kubernetes` in config spec. func isKubernetesKueue(conf *confmap.Conf, pipelineName string) bool { return isKubernetes(conf) && pipelineName == common.PipelineNameKueue && common.GetOrDefaultBool(conf, kubernetesKueueBasePathKey, false) } func isPrometheus(conf *confmap.Conf) bool { return conf.IsSet(prometheusBasePathKey) } func isOTLP(conf *confmap.Conf) bool { return conf.IsSet(common.OTLPLogsKey) } func setAppSignalsFields(_ *confmap.Conf, _ *awsemfexporter.Config) error { return nil } func setEcsFields(conf *confmap.Conf, cfg *awsemfexporter.Config) error { setDisableMetricExtraction(ecsBasePathKey, conf, cfg) return nil } func setKubernetesFields(conf *confmap.Conf, cfg *awsemfexporter.Config) error { cfg.AddEntity = true setDisableMetricExtraction(kubernetesBasePathKey, conf, cfg) if err := setKubernetesMetricDeclaration(conf, cfg); err != nil { return err } if awscontainerinsight.EnhancedContainerInsightsEnabled(conf) { cfg.EnhancedContainerInsights = true } return nil } func setCiJmxFields() error { return nil } func setKubernetesKueueFields(conf *confmap.Conf, cfg *awsemfexporter.Config) error { setDisableMetricExtraction(kubernetesKueueBasePathKey, conf, cfg) if err := setKubernetesKueueMetricDeclaration(conf, cfg); err != nil { return err } return nil } func setPrometheusFields(conf *confmap.Conf, cfg *awsemfexporter.Config) error { setDisableMetricExtraction(prometheusBasePathKey, conf, cfg) if err := setPrometheusLogGroup(conf, cfg); err != nil { return err } if conf.IsSet(emfProcessorBasePathKey) { if err := setPrometheusNamespace(conf, cfg); err != nil { return err } if err := setPrometheusMetricDescriptors(conf, cfg); err != nil { return err } if err := setPrometheusMetricDeclarations(conf, cfg); err != nil { return err } } if len(cfg.MetricDeclarations) == 0 { // When there are no metric declarations, CWA does not generate any EMF structured logs and instead just publishes them as plain log events // The awsemfexporter by default generates EMF structured logs for all if there are no metric declarations, hence adding a dummy rule here to prevent it cfg.MetricDeclarations = []*awsemfexporter.MetricDeclaration{ { MetricNameSelectors: []string{"$^"}, }, } } return nil } func setDisableMetricExtraction(baseKey string, conf *confmap.Conf, cfg *awsemfexporter.Config) { cfg.DisableMetricExtraction = common.GetOrDefaultBool(conf, common.ConfigKey(baseKey, common.DisableMetricExtraction), false) }