in confgenerator/config.go [964:1013]
func (uc *UnifiedConfig) metricsPipelines(ctx context.Context) ([]pipelineInstance, error) {
receivers, err := uc.MetricsReceivers()
if err != nil {
return nil, err
}
var out []pipelineInstance
if uc.Metrics != nil && uc.Metrics.Service != nil {
for pID, p := range uc.Metrics.Service.Pipelines {
for _, rID := range p.ReceiverIDs {
receiver, ok := receivers[rID]
if !ok {
return nil, fmt.Errorf("metrics receiver %q not found", rID)
}
var processors []struct {
id string
Component
}
canMerge := true
for _, prID := range p.ProcessorIDs {
processor, ok := uc.Metrics.Processors[prID]
if !ok {
return nil, fmt.Errorf("processor %q not found", prID)
}
if mr, ok := receiver.(MetricsProcessorMerger); ok && canMerge {
receiver, ok = mr.MergeMetricsProcessor(processor)
if ok {
// Only continue when the receiver can completely merge the processor;
// If the receiver is no longer a MetricsProcessorMerger, or it can't
// completely merge the current processor, break the loop
continue
}
}
canMerge = false
processors = append(processors, struct {
id string
Component
}{prID, processor})
}
out = append(out, pipelineInstance{
pipelineType: "metrics",
pID: pID,
rID: rID,
receiver: receiver,
processors: processors,
})
}
}
}
return out, nil
}