translator/translate/otel/processor/awsapplicationsignals/translator.go (203 lines of code) (raw):

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: MIT package awsapplicationsignals import ( _ "embed" "errors" "time" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/confmap" "go.opentelemetry.io/collector/pipeline" "go.opentelemetry.io/collector/processor" "github.com/aws/amazon-cloudwatch-agent/plugins/processors/awsapplicationsignals" appsignalsconfig "github.com/aws/amazon-cloudwatch-agent/plugins/processors/awsapplicationsignals/config" "github.com/aws/amazon-cloudwatch-agent/plugins/processors/awsapplicationsignals/rules" "github.com/aws/amazon-cloudwatch-agent/translator/config" "github.com/aws/amazon-cloudwatch-agent/translator/context" "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/common" "github.com/aws/amazon-cloudwatch-agent/translator/util/ecsutil" ) type translator struct { name string signal pipeline.Signal factory processor.Factory } type Option interface { apply(t *translator) } type optionFunc func(t *translator) func (o optionFunc) apply(t *translator) { o(t) } // WithSignal determines where the translator should look to find // the configuration. func WithSignal(signal pipeline.Signal) Option { return optionFunc(func(t *translator) { t.signal = signal }) } var _ common.ComponentTranslator = (*translator)(nil) func NewTranslator(opts ...Option) common.ComponentTranslator { t := &translator{factory: awsapplicationsignals.NewFactory()} for _, opt := range opts { opt.apply(t) } return t } func (t *translator) ID() component.ID { return component.NewIDWithName(t.factory.Type(), t.name) } func (t *translator) Translate(conf *confmap.Conf) (component.Config, error) { configKey := common.AppSignalsConfigKeys[t.signal] cfg := t.factory.CreateDefaultConfig().(*appsignalsconfig.Config) hostedIn, hostedInConfigured := common.GetHostedIn(conf) if common.IsAppSignalsKubernetes() { if !hostedInConfigured { hostedIn = common.GetClusterName(conf) } } mode := context.CurrentContext().KubernetesMode() if mode == "" { mode = context.CurrentContext().Mode() } if mode == config.ModeEC2 { if ecsutil.GetECSUtilSingleton().IsECS() { mode = config.ModeECS } } switch mode { case config.ModeEKS: cfg.Resolvers = []appsignalsconfig.Resolver{ appsignalsconfig.NewEKSResolver(hostedIn), } case config.ModeK8sEC2, config.ModeK8sOnPrem: cfg.Resolvers = []appsignalsconfig.Resolver{ appsignalsconfig.NewK8sResolver(hostedIn), } case config.ModeEC2: cfg.Resolvers = []appsignalsconfig.Resolver{ appsignalsconfig.NewEC2Resolver(hostedIn), } case config.ModeECS: cfg.Resolvers = []appsignalsconfig.Resolver{ appsignalsconfig.NewECSResolver(hostedIn), } default: cfg.Resolvers = []appsignalsconfig.Resolver{ appsignalsconfig.NewGenericResolver(hostedIn), } } limiterConfig, _ := t.translateMetricLimiterConfig(conf, configKey) cfg.Limiter = limiterConfig return t.translateCustomRules(conf, configKey, cfg) } func (t *translator) translateMetricLimiterConfig(conf *confmap.Conf, configKey []string) (*appsignalsconfig.LimiterConfig, error) { limiterConfigKey := common.ConfigKey(configKey[0], "limiter") if !conf.IsSet(limiterConfigKey) { limiterConfigKey = common.ConfigKey(configKey[1], "limiter") if !conf.IsSet(limiterConfigKey) { return nil, nil } } configJson, ok := conf.Get(limiterConfigKey).(map[string]interface{}) if !ok { return nil, errors.New("type conversion error: limiter is not an object") } limiterConfig := appsignalsconfig.NewDefaultLimiterConfig() if rawVal, exists := configJson["drop_threshold"]; exists { if val, ok := rawVal.(float64); !ok { return nil, errors.New("type conversion error: drop_threshold is not a number") } else { limiterConfig.Threshold = int(val) } } if rawVal, exists := configJson["disabled"]; exists { if val, ok := rawVal.(bool); !ok { return nil, errors.New("type conversion error: disabled is not a boolean") } else { limiterConfig.Disabled = val } } if rawVal, exists := configJson["log_dropped_metrics"]; exists { if val, ok := rawVal.(bool); !ok { return nil, errors.New("type conversion error: log_dropped_metrics is not a boolean") } else { limiterConfig.LogDroppedMetrics = val } } if rawVal, exists := configJson["garbage_collection_interval"]; exists { if val, ok := rawVal.(string); !ok { return nil, errors.New("type conversion error: garbage_collection_interval is not a string") } else { if interval, err := time.ParseDuration(val); err != nil { return nil, errors.New("type conversion error: garbage_collection_interval is not a time string") } else { limiterConfig.GarbageCollectionInterval = interval } } } if rawVal, exists := configJson["rotation_interval"]; exists { if val, ok := rawVal.(string); !ok { return nil, errors.New("type conversion error: rotation_interval is not a string") } else { if interval, err := time.ParseDuration(val); err != nil { return nil, errors.New("type conversion error: rotation_interval is not a time string") } else { limiterConfig.RotationInterval = interval } } } return limiterConfig, nil } func (t *translator) translateCustomRules(conf *confmap.Conf, configKey []string, cfg *appsignalsconfig.Config) (component.Config, error) { var rulesList []rules.Rule rulesConfigKey := common.ConfigKey(configKey[0], common.AppSignalsRules) if !conf.IsSet(rulesConfigKey) { rulesConfigKey = common.ConfigKey(configKey[1], common.AppSignalsRules) } if conf.IsSet(rulesConfigKey) { for _, rule := range conf.Get(rulesConfigKey).([]interface{}) { ruleConfig := rules.Rule{} ruleMap := rule.(map[string]interface{}) selectors := ruleMap["selectors"].([]interface{}) action := ruleMap["action"].(string) ruleConfig.Selectors = getServiceSelectors(selectors) if ruleName, ok := ruleMap["rule_name"]; ok { ruleConfig.RuleName = ruleName.(string) } var err error ruleConfig.Action, err = rules.GetAllowListAction(action) if err != nil { return nil, err } if ruleConfig.Action == rules.AllowListActionReplace { replacements, ok := ruleMap["replacements"] if !ok { return nil, errors.New("replace action set, but no replacements defined for service rule") } ruleConfig.Replacements = getServiceReplacements(replacements) } rulesList = append(rulesList, ruleConfig) } cfg.Rules = rulesList } return cfg, nil } func getServiceSelectors(selectorsList []interface{}) []rules.Selector { var selectors []rules.Selector for _, selector := range selectorsList { selectorConfig := rules.Selector{} selectorsMap := selector.(map[string]interface{}) selectorConfig.Dimension = selectorsMap["dimension"].(string) selectorConfig.Match = selectorsMap["match"].(string) selectors = append(selectors, selectorConfig) } return selectors } func getServiceReplacements(replacementsList interface{}) []rules.Replacement { var replacements []rules.Replacement for _, replacement := range replacementsList.([]interface{}) { replacementConfig := rules.Replacement{} replacementMap := replacement.(map[string]interface{}) replacementConfig.TargetDimension = replacementMap["target_dimension"].(string) replacementConfig.Value = replacementMap["value"].(string) replacements = append(replacements, replacementConfig) } return replacements }