func Build()

in pipeline/builder/builder.go [35:93]


func Build(cfg *config.Config, p persistence.Persistence, r stats.Recorder) (pipeline.Input, error) {
	agentId, err := agentid.CreateOrGet(p)
	if err != nil {
		return nil, err
	}
	endpointList, err := createEndpoints(cfg, agentId)
	if err != nil {
		return nil, err
	}
	endpointSenders := make(map[string]pipeline.Sender)
	for i := range endpointList {
		endpointSenders[endpointList[i].Name()] = senders.NewRetryingSender(endpointList[i], p, r)
	}

	// Inputs for the resultant Selector.
	selectorInputs := make(map[string]pipeline.Input)
	for _, metric := range cfg.Metrics {
		var msenders []pipeline.Sender
		for _, me := range metric.Endpoints {
			msenders = append(msenders, endpointSenders[me.Name])
		}
		di := &pipeline.InputAdapter{Sender: senders.NewDispatcher(msenders, r)}
		if metric.Aggregation != nil {
			bufferTime := time.Duration(metric.Aggregation.BufferSeconds) * time.Second
			selectorInputs[metric.Name] = inputs.NewAggregator(metric.Definition, bufferTime, di, p)
		} else if metric.Passthrough != nil {
			selectorInputs[metric.Name] = di
		}
	}

	head := inputs.NewSelector(selectorInputs)

	// Insert defined filters before selector.
	// Iterate in reverse order since the first defined filter should be the head of the pipeline.
	for i := len(cfg.Filters) - 1; i >= 0; i-- {
		f := cfg.Filters[i]
		if f.AddLabels != nil {
			head = inputs.NewLabelingInput(head, f.AddLabels.IncludedLabels())
		}
	}

	// Defined metric sources.
	var sourcesList []pipeline.Source
	for _, src := range cfg.Sources {
		if src.Heartbeat != nil {
			sourcesList = append(sourcesList, sources.NewHeartbeat(*src.Heartbeat, head))
		}
	}

	cb := func() error {
		var err *multierror.Error
		for _, src := range sourcesList {
			err = multierror.Append(err, src.Shutdown())
		}
		return err.ErrorOrNil()
	}

	return inputs.NewCallbackInput(head, cb), nil
}