in confgenerator/confgenerator.go [207:265]
func (p pipelineInstance) otelComponents(ctx context.Context) (map[string]otel.ReceiverPipeline, map[string]otel.Pipeline, error) {
outR := make(map[string]otel.ReceiverPipeline)
outP := make(map[string]otel.Pipeline)
receiver, ok := p.receiver.(OTelReceiver)
if !ok {
return nil, nil, fmt.Errorf("%q is not an otel receiver", p.rID)
}
// TODO: Add a way for receivers or processors to decide whether they're compatible with a particular config.
receiverPipelines, err := receiver.Pipelines(ctx)
if err != nil {
return nil, nil, fmt.Errorf("receiver %q has invalid configuration: %w", p.rID, err)
}
for i, receiverPipeline := range receiverPipelines {
receiverPipelineName := strings.ReplaceAll(p.rID, "_", "__")
if i > 0 {
receiverPipelineName = fmt.Sprintf("%s_%d", receiverPipelineName, i)
}
prefix := fmt.Sprintf("%s_%s", strings.ReplaceAll(p.pID, "_", "__"), receiverPipelineName)
if p.pipelineType != "metrics" {
// Don't prepend for metrics pipelines to preserve old golden configs.
prefix = fmt.Sprintf("%s_%s", p.pipelineType, prefix)
}
if processors, ok := receiverPipeline.Processors["logs"]; ok {
receiverPipeline.Processors["logs"] = append(
processors,
otelSetLogNameComponents(ctx, p.rID)...,
)
}
outR[receiverPipelineName] = receiverPipeline
pipeline := otel.Pipeline{
Type: p.pipelineType,
ReceiverPipelineName: receiverPipelineName,
}
// Check the Ops Agent receiver type.
if receiverPipeline.ExporterTypes[p.pipelineType] == otel.GMP {
// Prometheus receivers are incompatible with processors, so we need to assert that no processors are configured.
if len(p.processors) > 0 {
return nil, nil, fmt.Errorf("prometheus receivers are incompatible with Ops Agent processors")
}
}
for _, processorItem := range p.processors {
processor, ok := processorItem.Component.(OTelProcessor)
if !ok {
return nil, nil, fmt.Errorf("processor %q not supported in pipeline %q", processorItem.id, p.pID)
}
if processors, err := processor.Processors(ctx); err != nil {
return nil, nil, fmt.Errorf("processor %q has invalid configuration: %w", processorItem.id, err)
} else {
pipeline.Processors = append(pipeline.Processors, processors...)
}
}
outP[prefix] = pipeline
}
return outR, outP, nil
}