pipeline/inputs/inputs.go (82 lines of code) (raw):
// Copyright 2018 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package inputs
import (
"fmt"
"github.com/GoogleCloudPlatform/ubbagent/metrics"
"github.com/GoogleCloudPlatform/ubbagent/pipeline"
"github.com/golang/glog"
"github.com/hashicorp/go-multierror"
)
// Type selector is a pipeline.Input that routes a MetricReport to another pipeline.Input based on
// the metric name.
type selector struct {
// Map of metric names to pipeline.Input objects.
inputs map[string]pipeline.Input
tracker pipeline.UsageTracker
}
func (s *selector) AddReport(report metrics.MetricReport) error {
a, ok := s.inputs[report.Name]
if !ok {
return fmt.Errorf("selector: unknown metric: %v", report.Name)
}
return a.AddReport(report)
}
// Use increments the Selector's usage count.
// See pipeline.Component.Use.
func (s *selector) Use() {
s.tracker.Use()
}
// Release decrements the Selector's usage count. If it reaches 0, Release releases all of the
// underlying Aggregators concurrently and waits for the operations to finish.
// See pipeline.Component.Release.
func (s *selector) Release() error {
return s.tracker.Release(func() error {
components := make([]pipeline.Component, len(s.inputs))
i := 0
for _, v := range s.inputs {
components[i] = v
i++
}
return pipeline.ReleaseAll(components)
})
}
// NewSelector creates an Input that selects from the given inputs based on metric name. The inputs
// parameter is a map of metric name to the corresponding Input that handles it.
func NewSelector(inputs map[string]pipeline.Input) pipeline.Input {
for _, a := range inputs {
a.Use()
}
return &selector{inputs: inputs}
}
type callbackInput struct {
delegate pipeline.Input
shutdown func() error
tracker pipeline.UsageTracker
}
func (p *callbackInput) AddReport(report metrics.MetricReport) error {
return p.delegate.AddReport(report)
}
func (p *callbackInput) Use() {
p.tracker.Use()
}
func (p *callbackInput) Release() error {
return p.tracker.Release(func() error {
callbackErr := p.shutdown()
releaseError := p.delegate.Release()
return multierror.Append(callbackErr, releaseError).ErrorOrNil()
})
}
// NewCallbackInput creates an Input that calls the given shutdown hook when the Input is released.
// Shutdown is called before the Input's own delegate is released.
func NewCallbackInput(delegate pipeline.Input, shutdown func() error) pipeline.Input {
delegate.Use()
return &callbackInput{delegate: delegate, shutdown: shutdown}
}
type labelingInput struct {
pipeline.Component
delegate pipeline.Input
labels map[string]string
}
func (i *labelingInput) AddReport(report metrics.MetricReport) error {
for k, v := range i.labels {
if _, exists := report.Labels[k]; exists {
glog.Warningf("labelingInput: received report that already had label '%v'; skipping", k)
continue
}
if report.Labels == nil {
report.Labels = make(map[string]string)
}
report.Labels[k] = v
}
return i.delegate.AddReport(report)
}
// NewLabelingInput creates an Input that adds the given additional labels to incoming
// MetricReports before passing reports to the given delegate. If a report already contains a label
// with the same name, the original label is retained and a warning is logged.
func NewLabelingInput(delegate pipeline.Input, labels map[string]string) pipeline.Input {
return &labelingInput{Component: delegate, delegate: delegate, labels: labels}
}