pipeline/senders/dispatcher.go (64 lines of code) (raw):
// Copyright 2017 Google Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package senders
import (
"sync"
"github.com/GoogleCloudPlatform/ubbagent/metrics"
"github.com/GoogleCloudPlatform/ubbagent/pipeline"
"github.com/GoogleCloudPlatform/ubbagent/stats"
"github.com/hashicorp/go-multierror"
)
// Dispatcher is a Sender that fans out to other Sender instances. Generally,
// this will be a collection of Endpoints wrapped in RetryingSender objects.
type Dispatcher struct {
senders []pipeline.Sender
tracker pipeline.UsageTracker
recorder stats.Recorder
}
// Send fans out to each Sender in parallel and returns any errors. Send blocks
// until all sub-sends have finished.
func (d *Dispatcher) Send(report metrics.StampedMetricReport) error {
// First, register that each report will be handled by this Dispatcher's endpoints.
endpoints := d.Endpoints()
d.recorder.Register(report.Id, endpoints)
// Next, forward the reports to each subsequent sender.
errors := make([]error, len(d.senders))
wg := sync.WaitGroup{}
wg.Add(len(d.senders))
for i, ps := range d.senders {
go func(i int, s pipeline.Sender) {
// If the send generates an error, we assume that the downstream sender will register that
// error with the stats recorder.
errors[i] = s.Send(report)
wg.Done()
}(i, ps)
}
wg.Wait()
return multierror.Append(nil, errors...).ErrorOrNil()
}
// Use increments the Dispatcher's usage count.
// See pipeline.Component.Use.
func (d *Dispatcher) Use() {
d.tracker.Use()
}
// Release decrements the Dispatcher's usage count. If it reaches 0, Release releases all of the
// underlying senders concurrently and waits for the operations to finish.
// See pipeline.Component.Release.
func (d *Dispatcher) Release() error {
return d.tracker.Release(func() error {
errors := make([]error, len(d.senders))
wg := sync.WaitGroup{}
wg.Add(len(d.senders))
for i, s := range d.senders {
go func(i int, s pipeline.Sender) {
errors[i] = s.Release()
wg.Done()
}(i, s)
}
wg.Wait()
return multierror.Append(nil, errors...).ErrorOrNil()
})
}
func (d *Dispatcher) Endpoints() (handlers []string) {
seen := make(map[string]bool)
for _, s := range d.senders {
for _, e := range s.Endpoints() {
if _, exists := seen[e]; !exists {
seen[e] = true
handlers = append(handlers, e)
}
}
}
return
}
func NewDispatcher(senders []pipeline.Sender, recorder stats.Recorder) *Dispatcher {
for _, s := range senders {
s.Use()
}
return &Dispatcher{senders: senders, recorder: recorder}
}