pipeline/pipeline.go (57 lines of code) (raw):
// Copyright 2017 Google Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package pipeline describes a metrics reporting pipeline that accepts reports as input and
// eventually delivers them (possibly after aggregation) to one or more downstream services.
// A pipeline generally consists of a collection of aggregators, dispatchers, and endpoints wrapped
// in RetryingSender objects. Metric values can be provided by an external source (reported via an
// API), or can be generated by a component such as a heartbeat.
//
// -> Aggregator -> ... -> RetryingSender -> Endpoint A
// HTTP -> Selector -> Aggregator -> Dispatcher -> RetryingSender -> Endpoint B
// -> Aggregator -> ... -> RetryingSender -> Endpoint C
//
// Heartbeat -> Dispatcher -> RetryingSender -> Endpoint A
//
package pipeline
import (
"sync"
"github.com/GoogleCloudPlatform/ubbagent/metrics"
"github.com/hashicorp/go-multierror"
)
// Input represents a Component that accepts reports from an external source.
type Input interface {
// Input is also a Component.
Component
// AddReport adds a report to the pipeline. It returns an error if one is known immediately,
// such as a report that refers to unknown metrics. See aggregator.Aggregator.
AddReport(metrics.MetricReport) error
}
// Component represents a single component in a pipeline. Components can be used downstream of
// multiple other components, enabling creation of fork/join pipeline patterns. Because of this,
// components implement a reference counting strategy that determines when they should clean up
// underlying resources.
type Component interface {
// Use registers new usage of this component. Use should be called whenever this component is
// added downstream of some other component. When no longer used, Release should be called.
Use()
// Release is called when the caller is no longer using this component. If the component's usage
// count reaches 0 due to this release, it should perform the following steps in order:
// 1. Decrement the usage counter. If the usage counter is still greater than 0, return nil.
// 2. Gracefully shutdown background processes and wait for completion. Following this step,
// no data shall be sent from this component to downstream components.
// 3. Call Release on all downstream components, waiting for their release operations to
// complete.
//
// As a result, calling Release on all of the pipeline Input components should result in a graceful
// shutdown of all Components in the correct order.
//
// Release returns an error if it or any of its downstream components generate one.
Release() error
}
// Source represents an autonomous metric data source that runs within the Agent.
type Source interface {
// Shutdown instructs the source to stop sending metric data, release any held components, and
// clean up resources.
Shutdown() error
}
// Type UsageTracker is a utility that helps track the usage of a Component. It provides Use and
// Release methods, and calls a close function when Release decrements the usage count to 0.
type UsageTracker struct {
count int
mu sync.Mutex
}
func (u *UsageTracker) Use() {
u.mu.Lock()
defer u.mu.Unlock()
if u.count < 0 {
panic("UsageTracker is already closed")
}
u.count++
}
func (u *UsageTracker) Release(close func() error) error {
u.mu.Lock()
defer u.mu.Unlock()
// Check already closed condition
if u.count < 0 {
return nil
}
// Decrement usage. If the tracker was never Used, its count will now be -1. We still want to
// call the Close function.
u.count--
if u.count <= 0 {
u.count = -1
return close()
}
return nil
}
// ReleaseAll calls Release on all of the given Components in parallel, returning a multierror if
// one or more calls fail, or nil if all succeed.
func ReleaseAll(components []Component) error {
errors := make([]error, len(components))
wg := sync.WaitGroup{}
wg.Add(len(components))
var i int
for _, a := range components {
go func(i int, a Component) {
errors[i] = a.Release()
wg.Done()
}(i, a)
i++
}
wg.Wait()
return multierror.Append(nil, errors...).ErrorOrNil()
}