pipeline/builder/builder.go (94 lines of code) (raw):
// Copyright 2017 Google Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package builder
import (
"errors"
"time"
"github.com/GoogleCloudPlatform/ubbagent/agentid"
"github.com/GoogleCloudPlatform/ubbagent/config"
"github.com/GoogleCloudPlatform/ubbagent/persistence"
"github.com/GoogleCloudPlatform/ubbagent/pipeline"
"github.com/GoogleCloudPlatform/ubbagent/pipeline/endpoints"
"github.com/GoogleCloudPlatform/ubbagent/pipeline/inputs"
"github.com/GoogleCloudPlatform/ubbagent/pipeline/senders"
"github.com/GoogleCloudPlatform/ubbagent/pipeline/sources"
"github.com/GoogleCloudPlatform/ubbagent/stats"
"github.com/hashicorp/go-multierror"
)
// Build builds pipeline containing a configured Aggregator and all of the resources
// (persistence, endpoints) behind it. It returns the pipeline.Input.
func Build(cfg *config.Config, p persistence.Persistence, r stats.Recorder) (pipeline.Input, error) {
agentId, err := agentid.CreateOrGet(p)
if err != nil {
return nil, err
}
endpointList, err := createEndpoints(cfg, agentId)
if err != nil {
return nil, err
}
endpointSenders := make(map[string]pipeline.Sender)
for i := range endpointList {
endpointSenders[endpointList[i].Name()] = senders.NewRetryingSender(endpointList[i], p, r)
}
// Inputs for the resultant Selector.
selectorInputs := make(map[string]pipeline.Input)
for _, metric := range cfg.Metrics {
var msenders []pipeline.Sender
for _, me := range metric.Endpoints {
msenders = append(msenders, endpointSenders[me.Name])
}
di := &pipeline.InputAdapter{Sender: senders.NewDispatcher(msenders, r)}
if metric.Aggregation != nil {
bufferTime := time.Duration(metric.Aggregation.BufferSeconds) * time.Second
selectorInputs[metric.Name] = inputs.NewAggregator(metric.Definition, bufferTime, di, p)
} else if metric.Passthrough != nil {
selectorInputs[metric.Name] = di
}
}
head := inputs.NewSelector(selectorInputs)
// Insert defined filters before selector.
// Iterate in reverse order since the first defined filter should be the head of the pipeline.
for i := len(cfg.Filters) - 1; i >= 0; i-- {
f := cfg.Filters[i]
if f.AddLabels != nil {
head = inputs.NewLabelingInput(head, f.AddLabels.IncludedLabels())
}
}
// Defined metric sources.
var sourcesList []pipeline.Source
for _, src := range cfg.Sources {
if src.Heartbeat != nil {
sourcesList = append(sourcesList, sources.NewHeartbeat(*src.Heartbeat, head))
}
}
cb := func() error {
var err *multierror.Error
for _, src := range sourcesList {
err = multierror.Append(err, src.Shutdown())
}
return err.ErrorOrNil()
}
return inputs.NewCallbackInput(head, cb), nil
}
func createEndpoints(config *config.Config, agentId string) ([]pipeline.Endpoint, error) {
var eps []pipeline.Endpoint
for _, cfgep := range config.Endpoints {
ep, err := createEndpoint(config, &cfgep, agentId)
if err != nil {
// TODO(volkman): close already-created endpoints in event of error?
return nil, err
}
eps = append(eps, ep)
}
return eps, nil
}
func createEndpoint(config *config.Config, cfgep *config.Endpoint, agentId string) (pipeline.Endpoint, error) {
if cfgep.Disk != nil {
return endpoints.NewDiskEndpoint(
cfgep.Name,
cfgep.Disk.ReportDir,
time.Duration(cfgep.Disk.ExpireSeconds)*time.Second,
), nil
}
if cfgep.ServiceControl != nil {
return endpoints.NewServiceControlEndpoint(
cfgep.Name,
cfgep.ServiceControl.ServiceName,
agentId,
cfgep.ServiceControl.ConsumerId,
config.Identities.Get(cfgep.ServiceControl.Identity).GCP.GetServiceAccountKey(),
)
}
// TODO(volkman): support pubsub
return nil, errors.New("unsupported endpoint")
}