func()

in translator/translate/otel/pipeline/host/translator.go [72:158]


func (t translator) Translate(conf *confmap.Conf) (*common.ComponentTranslators, error) {
	if conf == nil || t.receivers.Len() == 0 {
		return nil, fmt.Errorf("no receivers configured in pipeline %s", t.name)
	}

	var entityProcessor common.ComponentTranslator
	var ec2TaggerEnabled bool

	translators := common.ComponentTranslators{
		Receivers:  t.receivers,
		Processors: common.NewTranslatorMap[component.Config, component.ID](),
		Exporters:  common.NewTranslatorMap[component.Config, component.ID](),
		Extensions: common.NewTranslatorMap[component.Config, component.ID](),
	}

	if strings.HasPrefix(t.name, common.PipelineNameHostDeltaMetrics) || strings.HasPrefix(t.name, common.PipelineNameHostOtlpMetrics) {
		log.Printf("D! delta processor required because metrics with diskio or net are set")
		translators.Processors.Set(cumulativetodeltaprocessor.NewTranslator(common.WithName(t.name), cumulativetodeltaprocessor.WithDefaultKeys()))
	}

	if t.Destination() != common.CloudWatchLogsKey {
		if conf.IsSet(common.ConfigKey(common.MetricsKey, common.AppendDimensionsKey)) {
			log.Printf("D! ec2tagger processor required because append_dimensions is set")
			translators.Processors.Set(ec2taggerprocessor.NewTranslator())
			ec2TaggerEnabled = true
		}

		mdt := metricsdecorator.NewTranslator(metricsdecorator.WithIgnorePlugins(common.JmxKey))
		if mdt.IsSet(conf) {
			log.Printf("D! metric decorator required because measurement fields are set")
			translators.Processors.Set(mdt)
		}
	}

	currentContext := context.CurrentContext()

	switch determinePipeline(t.name) {
	case common.PipelineNameHostOtlpMetrics:
		// TODO: For OTLP, the entity processor is only on K8S for now. Eventually this should be added to EC2
		if currentContext.KubernetesMode() != "" {
			entityProcessor = awsentity.NewTranslatorWithEntityType(awsentity.Service, common.OtlpKey, false)
			translators.Extensions.Set(k8smetadata.NewTranslator())
		} else if currentContext.Mode() == config.ModeEC2 {
			entityProcessor = awsentity.NewTranslatorWithEntityType(awsentity.Service, common.OtlpKey, false)
		}
	case common.PipelineNameHostCustomMetrics:
		if !currentContext.RunInContainer() {
			entityProcessor = awsentity.NewTranslatorWithEntityType(awsentity.Service, "telegraf", true)
		}
	case common.PipelineNameHost, common.PipelineNameHostDeltaMetrics:
		if !currentContext.RunInContainer() {
			entityProcessor = awsentity.NewTranslatorWithEntityType(awsentity.Resource, "", ec2TaggerEnabled)
		}
	}

	validDestination := slices.Contains(supportedEntityProcessorDestinations[:], t.Destination())
	// ECS is not in scope for entity association, so we only add the entity processor in non-ECS platforms
	isECS := ecsutil.GetECSUtilSingleton().IsECS()
	if entityProcessor != nil && currentContext.Mode() == config.ModeEC2 && !isECS && validDestination {
		translators.Processors.Set(entityProcessor)
	}

	switch t.Destination() {
	case common.DefaultDestination, common.CloudWatchKey:
		translators.Exporters.Set(awscloudwatch.NewTranslator())
		translators.Extensions.Set(agenthealth.NewTranslator(agenthealth.MetricsName, []string{agenthealth.OperationPutMetricData}))
		translators.Extensions.Set(agenthealth.NewTranslatorWithStatusCode(agenthealth.StatusCodeName, nil, true))
	case common.AMPKey:
		if conf.IsSet(common.MetricsAggregationDimensionsKey) {
			translators.Processors.Set(rollupprocessor.NewTranslator())
		}
		translators.Processors.Set(batchprocessor.NewTranslatorWithNameAndSection(t.name, common.MetricsKey))
		// prometheusremotewrite doesn't support delta metrics so convert them to cumulative metrics
		translators.Processors.Set(deltatocumulativeprocessor.NewTranslator(common.WithName(t.name)))
		translators.Exporters.Set(prometheusremotewrite.NewTranslatorWithName(common.AMPKey))
		translators.Extensions.Set(sigv4auth.NewTranslator())
	case common.CloudWatchLogsKey:
		translators.Processors.Set(batchprocessor.NewTranslatorWithNameAndSection(t.name, common.LogsKey))
		translators.Exporters.Set(awsemf.NewTranslator())
		translators.Extensions.Set(agenthealth.NewTranslator(agenthealth.LogsName, []string{agenthealth.OperationPutLogEvents}))
		translators.Extensions.Set(agenthealth.NewTranslatorWithStatusCode(agenthealth.StatusCodeName, nil, true))
	default:
		return nil, fmt.Errorf("pipeline (%s) does not support destination (%s) in configuration", t.name, t.Destination())
	}

	return &translators, nil
}