func()

in confgenerator/config.go [964:1013]


func (uc *UnifiedConfig) metricsPipelines(ctx context.Context) ([]pipelineInstance, error) {
	receivers, err := uc.MetricsReceivers()
	if err != nil {
		return nil, err
	}
	var out []pipelineInstance
	if uc.Metrics != nil && uc.Metrics.Service != nil {
		for pID, p := range uc.Metrics.Service.Pipelines {
			for _, rID := range p.ReceiverIDs {
				receiver, ok := receivers[rID]
				if !ok {
					return nil, fmt.Errorf("metrics receiver %q not found", rID)
				}
				var processors []struct {
					id string
					Component
				}
				canMerge := true
				for _, prID := range p.ProcessorIDs {
					processor, ok := uc.Metrics.Processors[prID]
					if !ok {
						return nil, fmt.Errorf("processor %q not found", prID)
					}
					if mr, ok := receiver.(MetricsProcessorMerger); ok && canMerge {
						receiver, ok = mr.MergeMetricsProcessor(processor)
						if ok {
							// Only continue when the receiver can completely merge the processor;
							// If the receiver is no longer a MetricsProcessorMerger, or it can't
							// completely merge the current processor, break the loop
							continue
						}
					}
					canMerge = false
					processors = append(processors, struct {
						id string
						Component
					}{prID, processor})
				}
				out = append(out, pipelineInstance{
					pipelineType: "metrics",
					pID:          pID,
					rID:          rID,
					receiver:     receiver,
					processors:   processors,
				})
			}
		}
	}
	return out, nil
}