in pipeline/builder/builder.go [35:93]
func Build(cfg *config.Config, p persistence.Persistence, r stats.Recorder) (pipeline.Input, error) {
agentId, err := agentid.CreateOrGet(p)
if err != nil {
return nil, err
}
endpointList, err := createEndpoints(cfg, agentId)
if err != nil {
return nil, err
}
endpointSenders := make(map[string]pipeline.Sender)
for i := range endpointList {
endpointSenders[endpointList[i].Name()] = senders.NewRetryingSender(endpointList[i], p, r)
}
// Inputs for the resultant Selector.
selectorInputs := make(map[string]pipeline.Input)
for _, metric := range cfg.Metrics {
var msenders []pipeline.Sender
for _, me := range metric.Endpoints {
msenders = append(msenders, endpointSenders[me.Name])
}
di := &pipeline.InputAdapter{Sender: senders.NewDispatcher(msenders, r)}
if metric.Aggregation != nil {
bufferTime := time.Duration(metric.Aggregation.BufferSeconds) * time.Second
selectorInputs[metric.Name] = inputs.NewAggregator(metric.Definition, bufferTime, di, p)
} else if metric.Passthrough != nil {
selectorInputs[metric.Name] = di
}
}
head := inputs.NewSelector(selectorInputs)
// Insert defined filters before selector.
// Iterate in reverse order since the first defined filter should be the head of the pipeline.
for i := len(cfg.Filters) - 1; i >= 0; i-- {
f := cfg.Filters[i]
if f.AddLabels != nil {
head = inputs.NewLabelingInput(head, f.AddLabels.IncludedLabels())
}
}
// Defined metric sources.
var sourcesList []pipeline.Source
for _, src := range cfg.Sources {
if src.Heartbeat != nil {
sourcesList = append(sourcesList, sources.NewHeartbeat(*src.Heartbeat, head))
}
}
cb := func() error {
var err *multierror.Error
for _, src := range sourcesList {
err = multierror.Append(err, src.Shutdown())
}
return err.ErrorOrNil()
}
return inputs.NewCallbackInput(head, cb), nil
}