func()

in confgenerator/confgenerator.go [207:265]


func (p pipelineInstance) otelComponents(ctx context.Context) (map[string]otel.ReceiverPipeline, map[string]otel.Pipeline, error) {
	outR := make(map[string]otel.ReceiverPipeline)
	outP := make(map[string]otel.Pipeline)
	receiver, ok := p.receiver.(OTelReceiver)
	if !ok {
		return nil, nil, fmt.Errorf("%q is not an otel receiver", p.rID)
	}
	// TODO: Add a way for receivers or processors to decide whether they're compatible with a particular config.
	receiverPipelines, err := receiver.Pipelines(ctx)
	if err != nil {
		return nil, nil, fmt.Errorf("receiver %q has invalid configuration: %w", p.rID, err)
	}
	for i, receiverPipeline := range receiverPipelines {
		receiverPipelineName := strings.ReplaceAll(p.rID, "_", "__")
		if i > 0 {
			receiverPipelineName = fmt.Sprintf("%s_%d", receiverPipelineName, i)
		}

		prefix := fmt.Sprintf("%s_%s", strings.ReplaceAll(p.pID, "_", "__"), receiverPipelineName)
		if p.pipelineType != "metrics" {
			// Don't prepend for metrics pipelines to preserve old golden configs.
			prefix = fmt.Sprintf("%s_%s", p.pipelineType, prefix)
		}

		if processors, ok := receiverPipeline.Processors["logs"]; ok {
			receiverPipeline.Processors["logs"] = append(
				processors,
				otelSetLogNameComponents(ctx, p.rID)...,
			)
		}

		outR[receiverPipelineName] = receiverPipeline

		pipeline := otel.Pipeline{
			Type:                 p.pipelineType,
			ReceiverPipelineName: receiverPipelineName,
		}
		// Check the Ops Agent receiver type.
		if receiverPipeline.ExporterTypes[p.pipelineType] == otel.GMP {
			// Prometheus receivers are incompatible with processors, so we need to assert that no processors are configured.
			if len(p.processors) > 0 {
				return nil, nil, fmt.Errorf("prometheus receivers are incompatible with Ops Agent processors")
			}
		}
		for _, processorItem := range p.processors {
			processor, ok := processorItem.Component.(OTelProcessor)
			if !ok {
				return nil, nil, fmt.Errorf("processor %q not supported in pipeline %q", processorItem.id, p.pID)
			}
			if processors, err := processor.Processors(ctx); err != nil {
				return nil, nil, fmt.Errorf("processor %q has invalid configuration: %w", processorItem.id, err)
			} else {
				pipeline.Processors = append(pipeline.Processors, processors...)
			}
		}
		outP[prefix] = pipeline
	}
	return outR, outP, nil
}