translator/translate/otel/processor/resourceprocessor/translator.go (113 lines of code) (raw):

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: MIT package resourceprocessor import ( "fmt" "os" "strconv" "strings" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourceprocessor" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/confmap" "go.opentelemetry.io/collector/processor" "github.com/aws/amazon-cloudwatch-agent/translator/config" "github.com/aws/amazon-cloudwatch-agent/translator/context" "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/common" ) type translator struct { common.NameProvider common.IndexProvider factory processor.Factory } var ( _ common.ComponentTranslator = (*translator)(nil) ) func NewTranslator(opts ...common.TranslatorOption) common.ComponentTranslator { t := &translator{factory: resourceprocessor.NewFactory()} t.SetIndex(-1) for _, opt := range opts { opt(t) } if t.Index() != -1 { t.SetName(t.Name() + "/" + strconv.Itoa(t.Index())) } return t } var _ common.ComponentTranslator = (*translator)(nil) func (t *translator) ID() component.ID { return component.NewIDWithName(t.factory.Type(), t.Name()) } // Translate creates a processor config based on the fields in the // Metrics section of the JSON config. func (t *translator) Translate(conf *confmap.Conf) (component.Config, error) { if conf == nil || (!conf.IsSet(common.JmxConfigKey) && t.Name() != common.PipelineNameContainerInsightsJmx) { return nil, &common.MissingKeyError{ID: t.ID(), JsonKey: common.JmxConfigKey} } cfg := t.factory.CreateDefaultConfig().(*resourceprocessor.Config) var attributes []any if strings.HasPrefix(t.Name(), common.PipelineNameJmx) { attributes = t.getJMXAttributes(conf) } else if t.Name() == common.PipelineNameContainerInsightsJmx { attributes = t.getContainerInsightsJMXAttributes(conf) } if len(attributes) == 0 { baseKey := common.JmxConfigKey if t.Index() != -1 { baseKey = fmt.Sprintf("%s[%d]", baseKey, t.Index()) } return nil, &common.MissingKeyError{ID: t.ID(), JsonKey: common.ConfigKey(baseKey, common.AppendDimensionsKey)} } c := confmap.NewFromStringMap(map[string]any{ "attributes": attributes, }) if err := c.Unmarshal(&cfg); err != nil { return nil, fmt.Errorf("unable to unmarshal resource processor: %w", err) } return cfg, nil } func (t *translator) getJMXAttributes(conf *confmap.Conf) []any { if !context.CurrentContext().RunInContainer() { return []any{ map[string]any{ "action": "delete", "pattern": "telemetry.sdk.*", }, map[string]any{ "action": "delete", "key": "service.name", "value": "unknown_service:java", }, } } jmxMap := common.GetIndexedMap(conf, common.JmxConfigKey, t.Index()) appendDimensions, ok := jmxMap[common.AppendDimensionsKey].(map[string]any) if !ok { return nil } var attributes []any for key, value := range appendDimensions { attributes = append(attributes, map[string]any{ "action": "upsert", "key": key, "value": value, }) } return attributes } func (t *translator) getContainerInsightsJMXAttributes(conf *confmap.Conf) []any { clusterName := common.GetClusterName(conf) nodeName := os.Getenv(config.HOST_NAME) return []any{ map[string]any{ "key": "Namespace", "from_attribute": "k8s.namespace.name", "action": "insert", }, map[string]any{ "key": "ClusterName", "value": clusterName, // Ensure 'clusterName' is defined earlier "action": "upsert", }, map[string]any{ "key": "NodeName", "value": nodeName, "action": "insert", }, } }