in translator/translate/otel/pipeline/host/translator.go [72:158]
func (t translator) Translate(conf *confmap.Conf) (*common.ComponentTranslators, error) {
if conf == nil || t.receivers.Len() == 0 {
return nil, fmt.Errorf("no receivers configured in pipeline %s", t.name)
}
var entityProcessor common.ComponentTranslator
var ec2TaggerEnabled bool
translators := common.ComponentTranslators{
Receivers: t.receivers,
Processors: common.NewTranslatorMap[component.Config, component.ID](),
Exporters: common.NewTranslatorMap[component.Config, component.ID](),
Extensions: common.NewTranslatorMap[component.Config, component.ID](),
}
if strings.HasPrefix(t.name, common.PipelineNameHostDeltaMetrics) || strings.HasPrefix(t.name, common.PipelineNameHostOtlpMetrics) {
log.Printf("D! delta processor required because metrics with diskio or net are set")
translators.Processors.Set(cumulativetodeltaprocessor.NewTranslator(common.WithName(t.name), cumulativetodeltaprocessor.WithDefaultKeys()))
}
if t.Destination() != common.CloudWatchLogsKey {
if conf.IsSet(common.ConfigKey(common.MetricsKey, common.AppendDimensionsKey)) {
log.Printf("D! ec2tagger processor required because append_dimensions is set")
translators.Processors.Set(ec2taggerprocessor.NewTranslator())
ec2TaggerEnabled = true
}
mdt := metricsdecorator.NewTranslator(metricsdecorator.WithIgnorePlugins(common.JmxKey))
if mdt.IsSet(conf) {
log.Printf("D! metric decorator required because measurement fields are set")
translators.Processors.Set(mdt)
}
}
currentContext := context.CurrentContext()
switch determinePipeline(t.name) {
case common.PipelineNameHostOtlpMetrics:
// TODO: For OTLP, the entity processor is only on K8S for now. Eventually this should be added to EC2
if currentContext.KubernetesMode() != "" {
entityProcessor = awsentity.NewTranslatorWithEntityType(awsentity.Service, common.OtlpKey, false)
translators.Extensions.Set(k8smetadata.NewTranslator())
} else if currentContext.Mode() == config.ModeEC2 {
entityProcessor = awsentity.NewTranslatorWithEntityType(awsentity.Service, common.OtlpKey, false)
}
case common.PipelineNameHostCustomMetrics:
if !currentContext.RunInContainer() {
entityProcessor = awsentity.NewTranslatorWithEntityType(awsentity.Service, "telegraf", true)
}
case common.PipelineNameHost, common.PipelineNameHostDeltaMetrics:
if !currentContext.RunInContainer() {
entityProcessor = awsentity.NewTranslatorWithEntityType(awsentity.Resource, "", ec2TaggerEnabled)
}
}
validDestination := slices.Contains(supportedEntityProcessorDestinations[:], t.Destination())
// ECS is not in scope for entity association, so we only add the entity processor in non-ECS platforms
isECS := ecsutil.GetECSUtilSingleton().IsECS()
if entityProcessor != nil && currentContext.Mode() == config.ModeEC2 && !isECS && validDestination {
translators.Processors.Set(entityProcessor)
}
switch t.Destination() {
case common.DefaultDestination, common.CloudWatchKey:
translators.Exporters.Set(awscloudwatch.NewTranslator())
translators.Extensions.Set(agenthealth.NewTranslator(agenthealth.MetricsName, []string{agenthealth.OperationPutMetricData}))
translators.Extensions.Set(agenthealth.NewTranslatorWithStatusCode(agenthealth.StatusCodeName, nil, true))
case common.AMPKey:
if conf.IsSet(common.MetricsAggregationDimensionsKey) {
translators.Processors.Set(rollupprocessor.NewTranslator())
}
translators.Processors.Set(batchprocessor.NewTranslatorWithNameAndSection(t.name, common.MetricsKey))
// prometheusremotewrite doesn't support delta metrics so convert them to cumulative metrics
translators.Processors.Set(deltatocumulativeprocessor.NewTranslator(common.WithName(t.name)))
translators.Exporters.Set(prometheusremotewrite.NewTranslatorWithName(common.AMPKey))
translators.Extensions.Set(sigv4auth.NewTranslator())
case common.CloudWatchLogsKey:
translators.Processors.Set(batchprocessor.NewTranslatorWithNameAndSection(t.name, common.LogsKey))
translators.Exporters.Set(awsemf.NewTranslator())
translators.Extensions.Set(agenthealth.NewTranslator(agenthealth.LogsName, []string{agenthealth.OperationPutLogEvents}))
translators.Extensions.Set(agenthealth.NewTranslatorWithStatusCode(agenthealth.StatusCodeName, nil, true))
default:
return nil, fmt.Errorf("pipeline (%s) does not support destination (%s) in configuration", t.name, t.Destination())
}
return &translators, nil
}