translator/translate/otel/pipeline/host/translator.go (141 lines of code) (raw):
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT
package host
import (
"fmt"
"log"
"slices"
"strings"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/collector/pipeline"
"github.com/aws/amazon-cloudwatch-agent/translator/config"
"github.com/aws/amazon-cloudwatch-agent/translator/context"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/common"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/exporter/awscloudwatch"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/exporter/awsemf"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/exporter/prometheusremotewrite"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/extension/agenthealth"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/extension/k8smetadata"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/extension/sigv4auth"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/processor/awsentity"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/processor/batchprocessor"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/processor/cumulativetodeltaprocessor"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/processor/deltatocumulativeprocessor"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/processor/ec2taggerprocessor"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/processor/metricsdecorator"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/processor/rollupprocessor"
"github.com/aws/amazon-cloudwatch-agent/translator/util/ecsutil"
)
type translator struct {
name string
common.DestinationProvider
receivers common.ComponentTranslatorMap
}
var _ common.PipelineTranslator = (*translator)(nil)
var supportedEntityProcessorDestinations = [...]string{
common.DefaultDestination,
common.CloudWatchKey,
common.CloudWatchLogsKey,
}
// NewTranslator creates a new host pipeline translator. The receiver types
// passed in are converted to config.ComponentIDs, sorted, and used directly
// in the translated pipeline.
func NewTranslator(
name string,
receivers common.ComponentTranslatorMap,
opts ...common.TranslatorOption,
) common.PipelineTranslator {
t := &translator{name: name, receivers: receivers}
for _, opt := range opts {
opt(t)
}
if t.Destination() != "" {
t.name += "/" + t.Destination()
}
return t
}
func (t translator) ID() pipeline.ID {
return pipeline.NewIDWithName(pipeline.SignalMetrics, t.name)
}
// Translate creates a pipeline if metrics section exists.
func (t translator) Translate(conf *confmap.Conf) (*common.ComponentTranslators, error) {
if conf == nil || t.receivers.Len() == 0 {
return nil, fmt.Errorf("no receivers configured in pipeline %s", t.name)
}
var entityProcessor common.ComponentTranslator
var ec2TaggerEnabled bool
translators := common.ComponentTranslators{
Receivers: t.receivers,
Processors: common.NewTranslatorMap[component.Config, component.ID](),
Exporters: common.NewTranslatorMap[component.Config, component.ID](),
Extensions: common.NewTranslatorMap[component.Config, component.ID](),
}
if strings.HasPrefix(t.name, common.PipelineNameHostDeltaMetrics) || strings.HasPrefix(t.name, common.PipelineNameHostOtlpMetrics) {
log.Printf("D! delta processor required because metrics with diskio or net are set")
translators.Processors.Set(cumulativetodeltaprocessor.NewTranslator(common.WithName(t.name), cumulativetodeltaprocessor.WithDefaultKeys()))
}
if t.Destination() != common.CloudWatchLogsKey {
if conf.IsSet(common.ConfigKey(common.MetricsKey, common.AppendDimensionsKey)) {
log.Printf("D! ec2tagger processor required because append_dimensions is set")
translators.Processors.Set(ec2taggerprocessor.NewTranslator())
ec2TaggerEnabled = true
}
mdt := metricsdecorator.NewTranslator(metricsdecorator.WithIgnorePlugins(common.JmxKey))
if mdt.IsSet(conf) {
log.Printf("D! metric decorator required because measurement fields are set")
translators.Processors.Set(mdt)
}
}
currentContext := context.CurrentContext()
switch determinePipeline(t.name) {
case common.PipelineNameHostOtlpMetrics:
// TODO: For OTLP, the entity processor is only on K8S for now. Eventually this should be added to EC2
if currentContext.KubernetesMode() != "" {
entityProcessor = awsentity.NewTranslatorWithEntityType(awsentity.Service, common.OtlpKey, false)
translators.Extensions.Set(k8smetadata.NewTranslator())
} else if currentContext.Mode() == config.ModeEC2 {
entityProcessor = awsentity.NewTranslatorWithEntityType(awsentity.Service, common.OtlpKey, false)
}
case common.PipelineNameHostCustomMetrics:
if !currentContext.RunInContainer() {
entityProcessor = awsentity.NewTranslatorWithEntityType(awsentity.Service, "telegraf", true)
}
case common.PipelineNameHost, common.PipelineNameHostDeltaMetrics:
if !currentContext.RunInContainer() {
entityProcessor = awsentity.NewTranslatorWithEntityType(awsentity.Resource, "", ec2TaggerEnabled)
}
}
validDestination := slices.Contains(supportedEntityProcessorDestinations[:], t.Destination())
// ECS is not in scope for entity association, so we only add the entity processor in non-ECS platforms
isECS := ecsutil.GetECSUtilSingleton().IsECS()
if entityProcessor != nil && currentContext.Mode() == config.ModeEC2 && !isECS && validDestination {
translators.Processors.Set(entityProcessor)
}
switch t.Destination() {
case common.DefaultDestination, common.CloudWatchKey:
translators.Exporters.Set(awscloudwatch.NewTranslator())
translators.Extensions.Set(agenthealth.NewTranslator(agenthealth.MetricsName, []string{agenthealth.OperationPutMetricData}))
translators.Extensions.Set(agenthealth.NewTranslatorWithStatusCode(agenthealth.StatusCodeName, nil, true))
case common.AMPKey:
if conf.IsSet(common.MetricsAggregationDimensionsKey) {
translators.Processors.Set(rollupprocessor.NewTranslator())
}
translators.Processors.Set(batchprocessor.NewTranslatorWithNameAndSection(t.name, common.MetricsKey))
// prometheusremotewrite doesn't support delta metrics so convert them to cumulative metrics
translators.Processors.Set(deltatocumulativeprocessor.NewTranslator(common.WithName(t.name)))
translators.Exporters.Set(prometheusremotewrite.NewTranslatorWithName(common.AMPKey))
translators.Extensions.Set(sigv4auth.NewTranslator())
case common.CloudWatchLogsKey:
translators.Processors.Set(batchprocessor.NewTranslatorWithNameAndSection(t.name, common.LogsKey))
translators.Exporters.Set(awsemf.NewTranslator())
translators.Extensions.Set(agenthealth.NewTranslator(agenthealth.LogsName, []string{agenthealth.OperationPutLogEvents}))
translators.Extensions.Set(agenthealth.NewTranslatorWithStatusCode(agenthealth.StatusCodeName, nil, true))
default:
return nil, fmt.Errorf("pipeline (%s) does not support destination (%s) in configuration", t.name, t.Destination())
}
return &translators, nil
}
func determinePipeline(name string) string {
// The conditionals have to be done in a certain order because PipelineNameHost is just "host", whereas
// the other constants are prefixed with "host"
if strings.HasPrefix(name, common.PipelineNameHostDeltaMetrics) {
return common.PipelineNameHostDeltaMetrics
} else if strings.HasPrefix(name, common.PipelineNameHostOtlpMetrics) {
return common.PipelineNameHostOtlpMetrics
} else if strings.HasPrefix(name, common.PipelineNameHostCustomMetrics) {
return common.PipelineNameHostCustomMetrics
} else if strings.HasPrefix(name, common.PipelineNameHost) {
return common.PipelineNameHost
}
return ""
}