pipeline/inputs/aggregator.go (209 lines of code) (raw):
// Copyright 2017 Google Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package inputs
import (
"errors"
"fmt"
"reflect"
"sync"
"time"
"github.com/GoogleCloudPlatform/ubbagent/clock"
"github.com/GoogleCloudPlatform/ubbagent/metrics"
"github.com/GoogleCloudPlatform/ubbagent/persistence"
"github.com/GoogleCloudPlatform/ubbagent/pipeline"
"github.com/GoogleCloudPlatform/ubbagent/util"
"github.com/golang/glog"
)
const (
persistencePrefix = "aggregator/"
)
type addMsg struct {
report metrics.MetricReport
result chan error
}
// Aggregator is the head of the metrics reporting pipeline. It accepts reports from the reporting
// client, buffers and aggregates for a configured amount of time, and sends them downstream.
// See pipeline.Pipeline.
type Aggregator struct {
clock clock.Clock
metric metrics.Definition
bufferTime time.Duration
input pipeline.Input
persistence persistence.Persistence
currentBucket *bucket
pushTimer *time.Timer
push chan chan bool
add chan addMsg
closed bool
closeMutex sync.RWMutex
wait sync.WaitGroup
tracker pipeline.UsageTracker
}
// NewAggregator creates a new Aggregator instance and starts its goroutine.
func NewAggregator(metric metrics.Definition, bufferTime time.Duration, input pipeline.Input, persistence persistence.Persistence) *Aggregator {
return newAggregator(metric, bufferTime, input, persistence, clock.NewClock())
}
func newAggregator(metric metrics.Definition, bufferTime time.Duration, input pipeline.Input, persistence persistence.Persistence, clock clock.Clock) *Aggregator {
agg := &Aggregator{
metric: metric,
bufferTime: bufferTime,
input: input,
persistence: persistence,
clock: clock,
push: make(chan chan bool),
add: make(chan addMsg),
}
if !agg.loadState() {
agg.currentBucket = newBucket(clock.Now())
}
input.Use()
agg.wait.Add(1)
go agg.run()
return agg
}
// AddReport adds a report. Reports are aggregated when possible, during a time period defined by
// the Aggregator's config object. Two reports can be aggregated if they have the same name, contain
// the same labels, and don't contain overlapping time ranges denoted by StartTime and EndTme.
func (h *Aggregator) AddReport(report metrics.MetricReport) error {
glog.V(2).Infof("aggregator: received report: %v", report.Name)
if err := report.Validate(h.metric); err != nil {
return err
}
h.closeMutex.RLock()
defer h.closeMutex.RUnlock()
if h.closed {
return errors.New("aggregator: AddReport called on closed aggregator")
}
msg := addMsg{
report: report,
result: make(chan error, 1),
}
h.add <- msg
return <-msg.result
}
// Use increments the Aggregator's usage count.
// See pipeline.Component.Use.
func (h *Aggregator) Use() {
h.tracker.Use()
}
// Release decrements the Aggregator's usage count. If it reaches 0, Release instructs the
// Aggregator's goroutine to shutdown. Any currently-aggregated metrics will
// be reported to the downstream sender as part of this process. Release blocks until the operation
// has completed.
// See pipeline.Component.Release.
func (h *Aggregator) Release() error {
return h.tracker.Release(func() error {
h.closeMutex.Lock()
if !h.closed {
close(h.add)
h.closed = true
}
h.closeMutex.Unlock()
h.wait.Wait()
// Cascade
return h.input.Release()
})
}
func (h *Aggregator) run() {
running := true
for running {
// Set a timer to fire when the current bucket should be pushed.
now := h.clock.Now()
nextFire := now.Add(h.bufferTime - now.Sub(h.currentBucket.CreateTime))
timer := h.clock.NewTimerAt(nextFire)
select {
case msg, ok := <-h.add:
if ok {
err := h.currentBucket.addReport(msg.report)
if err == nil {
// TODO(volkman): possibly rate-limit persistence, or flush to disk at a defined interval.
// Perhaps a benchmark to determine whether eager persistence is a bottleneck.
h.persistState()
}
msg.result <- err
} else {
running = false
}
case now := <-timer.GetC():
// Time to push the current bucket.
h.pushBucket(now)
}
timer.Stop()
}
h.pushBucket(h.clock.Now())
h.wait.Done()
}
func (h *Aggregator) loadState() bool {
err := h.persistence.Value(h.persistenceName()).Load(&h.currentBucket)
if err == persistence.ErrNotFound {
// Didn't find existing state to load.
return false
} else if err == nil {
// We loaded state.
return true
}
// Some other error loading existing state.
panic(fmt.Sprintf("error loading aggregator state: %+v", err))
}
func (h *Aggregator) persistState() {
// TODO(volkman): always persist a metric's previous end time, even if no bucket is persisted,
// so that the start time of the next report after a restart is validated.
if err := h.persistence.Value(h.persistenceName()).Store(h.currentBucket); err != nil {
panic(fmt.Sprintf("error persisting aggregator state: %+v", err))
}
}
// pushBucket sends currently-aggregated metrics to the configured MetricSender and resets the
// bucket.
func (h *Aggregator) pushBucket(now time.Time) {
if h.currentBucket == nil {
h.currentBucket = newBucket(now)
return
}
var finishedReports []metrics.MetricReport
for _, namedReports := range h.currentBucket.Reports {
for _, report := range namedReports {
finishedReports = append(finishedReports, *report.metricReport())
}
}
if len(finishedReports) > 0 {
if len(finishedReports) == 1 {
glog.V(2).Infoln("aggregator: sending 1 report")
} else {
glog.V(2).Infof("aggregator: sending %v reports", len(finishedReports))
}
for _, r := range finishedReports {
err := h.input.AddReport(r)
if err != nil {
glog.Errorf("aggregator: error sending report: %+v", err)
continue
}
}
}
h.currentBucket = newBucket(now)
h.persistState()
}
func (h *Aggregator) persistenceName() string {
return persistencePrefix + h.metric.Name
}
type bucket struct {
CreateTime time.Time
Reports map[string][]*aggregatedReport
}
// aggregatedReport is an extension of MetricReport that supports operations for combining reports.
type aggregatedReport metrics.MetricReport
// accept possibly aggregates the given MetricReport into this aggregatedReport. Returns true
// if the report was aggregated, or false if the labels or name don't match.
func (ar *aggregatedReport) accept(mr metrics.MetricReport) (bool, error) {
if mr.Name != ar.Name || !reflect.DeepEqual(mr.Labels, ar.Labels) {
return false, nil
}
// Only one of these values should be non-nil. We rely on prior validation to ensure the proper
// value (i.e., the one specified in the metrics.Definition) is provided.
if mr.Value.Int64Value != nil {
if ar.Value.Int64Value == nil {
ar.Value.Int64Value = util.NewInt64(0)
}
*ar.Value.Int64Value += *mr.Value.Int64Value
} else if mr.Value.DoubleValue != nil {
if ar.Value.DoubleValue == nil {
ar.Value.DoubleValue = util.NewFloat64(0)
}
*ar.Value.DoubleValue += *mr.Value.DoubleValue
}
// Expand the aggregated start time if the given MetricReport has ealier start time.
if mr.StartTime.Before(ar.StartTime) {
ar.StartTime = mr.StartTime
}
// Expand the aggregated end time if the given MetricReport has later end time.
if mr.EndTime.After(ar.StartTime) {
ar.EndTime = mr.EndTime
}
return true, nil
}
func (ar *aggregatedReport) metricReport() *metrics.MetricReport {
return (*metrics.MetricReport)(ar)
}
func newBucket(t time.Time) *bucket {
return &bucket{
Reports: make(map[string][]*aggregatedReport),
CreateTime: t,
}
}
func (b *bucket) addReport(mr metrics.MetricReport) error {
for _, ar := range b.Reports[mr.Name] {
accepted, err := ar.accept(mr)
if err != nil {
return err
}
if accepted {
return nil
}
}
b.Reports[mr.Name] = append(b.Reports[mr.Name], (*aggregatedReport)(&mr))
return nil
}