pipeline/senders/retry.go (181 lines of code) (raw):
// Copyright 2017 Google Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package senders
import (
"errors"
"flag"
"math/rand"
"path"
"sync"
"time"
"github.com/GoogleCloudPlatform/ubbagent/clock"
"github.com/GoogleCloudPlatform/ubbagent/metrics"
"github.com/GoogleCloudPlatform/ubbagent/persistence"
"github.com/GoogleCloudPlatform/ubbagent/pipeline"
"github.com/GoogleCloudPlatform/ubbagent/stats"
"github.com/golang/glog"
)
const (
persistPrefix = "epqueue"
)
var minRetryDelay = flag.Duration("min_retry_delay", 2*time.Second, "minimum exponential backoff delay")
var maxRetryDelay = flag.Duration("max_retry_delay", 60*time.Second, "maximum exponential backoff delay")
var maxQueueTime = flag.Duration("max_queue_time", 3*time.Hour, "maximum amount of time to keep an entry in the retry queue")
// RetryingSender is a Sender handles sending reports to remote endpoints.
// It buffers reports and retries in the event of a send failure, using exponential backoff between
// retry attempts. Minimum and maximum delays are configurable via the "retrymin" and "retrymax"
// flags.
type RetryingSender struct {
endpoint pipeline.Endpoint
queue persistence.Queue
recorder stats.Recorder
clock clock.Clock
lastAttempt time.Time
delay time.Duration
minDelay time.Duration
maxDelay time.Duration
add chan addMsg
closed bool
closeMutex sync.RWMutex
wait sync.WaitGroup
tracker pipeline.UsageTracker
}
type addMsg struct {
entry queueEntry
result chan error
}
type queueEntry struct {
Report pipeline.EndpointReport
SendTime time.Time
}
// NewRetryingSender creates a new RetryingSender for endpoint, storing state in persistence.
func NewRetryingSender(endpoint pipeline.Endpoint, persistence persistence.Persistence, recorder stats.Recorder) *RetryingSender {
return newRetryingSender(endpoint, persistence, recorder, clock.NewClock(), *minRetryDelay, *maxRetryDelay)
}
func newRetryingSender(endpoint pipeline.Endpoint, persistence persistence.Persistence, recorder stats.Recorder, clock clock.Clock, minDelay, maxDelay time.Duration) *RetryingSender {
rs := &RetryingSender{
endpoint: endpoint,
queue: persistence.Queue(persistenceName(endpoint.Name())),
recorder: recorder,
clock: clock,
minDelay: minDelay,
maxDelay: maxDelay,
add: make(chan addMsg, 1),
}
endpoint.Use()
rs.wait.Add(1)
go rs.run(clock.Now())
return rs
}
func (rs *RetryingSender) Send(report metrics.StampedMetricReport) error {
rs.closeMutex.RLock()
defer rs.closeMutex.RUnlock()
if rs.closed {
return errors.New("RetryingSender: Send called on closed sender")
}
epr, err := rs.endpoint.BuildReport(report)
if err != nil {
rs.recorder.SendFailed(report.Id, rs.endpoint.Name())
return err
}
msg := addMsg{
entry: queueEntry{epr, rs.clock.Now()},
result: make(chan error),
}
rs.add <- msg
err = <-msg.result
if err != nil {
// Record this immediate failure.
rs.recorder.SendFailed(report.Id, rs.endpoint.Name())
}
return err
}
func (rs *RetryingSender) Endpoints() []string {
return []string{rs.endpoint.Name()}
}
// Use increments the RetryingSender's usage count.
// See pipeline.Component.Use.
func (rs *RetryingSender) Use() {
rs.tracker.Use()
}
// Release decrements the RetryingSender's usage count. If it reaches 0, Release instructs the
// RetryingSender to gracefully shutdown. Any reports that have not yet been
// sent will be persisted to disk, and the wrapped Endpoint will be released. Release blocks until
// the operation has completed.
// See pipeline.Component.Release.
func (rs *RetryingSender) Release() error {
return rs.tracker.Release(func() error {
rs.closeMutex.Lock()
if !rs.closed {
close(rs.add)
rs.closed = true
}
rs.closeMutex.Unlock()
rs.wait.Wait()
return rs.endpoint.Release()
})
}
func (rs *RetryingSender) run(start time.Time) {
// Start with an initial call to maybeSend() to start sending any persisted state.
rs.maybeSend(start)
for {
var timer clock.Timer
if rs.delay == 0 {
// A delay of 0 means we're not retrying. Disable the retry timer; We'll wakeup when a new
// report is sent.
timer = clock.NewStoppedTimer()
} else {
// Compute the next retry time, which is the current time + current delay + [0,1000) ms jitter
now := rs.clock.Now()
jitter := time.Duration(rand.Int63n(1000)) * time.Millisecond
nextFire := now.Add(rs.delay - now.Sub(rs.lastAttempt)).Add(jitter)
timer = rs.clock.NewTimerAt(nextFire)
}
select {
case msg, ok := <-rs.add:
if ok {
err := rs.queue.Enqueue(msg.entry)
if err != nil {
msg.result <- err
break
}
// Successfully queued the message
msg.result <- nil
rs.maybeSend(msg.entry.SendTime)
} else {
// Channel was closed.
rs.wait.Done()
return
}
case now := <-timer.GetC():
rs.maybeSend(now)
}
timer.Stop()
}
}
// maybeSend retries a pending send if the required time delay has elapsed.
func (rs *RetryingSender) maybeSend(now time.Time) {
if now.Before(rs.lastAttempt.Add(rs.delay)) {
// Not time yet.
return
}
for {
entry := &queueEntry{}
if loaderr := rs.queue.Peek(entry); loaderr == persistence.ErrNotFound {
break
} else if loaderr != nil {
// We failed to load from the persistent queue. This isn't recoverable.
panic("RetryingSender.maybeSend: loading from retry queue: " + loaderr.Error())
}
if senderr := rs.endpoint.Send(entry.Report); senderr != nil {
// We've encountered a send error. If the error is considered transient and the entry hasn't
// reached its maximum queue time, we'll leave it in the queue and retry. Otherwise it's
// removed from the queue, logged, and recorded as a failure.
expired := rs.clock.Now().Sub(entry.SendTime) > *maxQueueTime
if !expired && rs.endpoint.IsTransient(senderr) {
// Set next attempt
rs.lastAttempt = now
rs.delay = bounded(rs.delay*2, rs.minDelay, rs.maxDelay)
glog.Warningf("RetryingSender.maybeSend [%[1]T - transient; will retry]: %[1]s", senderr)
break
} else if expired {
glog.Errorf("RetryingSender.maybeSend [%[1]T - retry expired]: %[1]s", senderr)
rs.recorder.SendFailed(entry.Report.Id, rs.endpoint.Name())
} else {
glog.Errorf("RetryingSender.maybeSend [%[1]T - will NOT retry]: %[1]s", senderr)
rs.recorder.SendFailed(entry.Report.Id, rs.endpoint.Name())
}
} else {
// Send was successful.
rs.recorder.SendSucceeded(entry.Report.Id, rs.endpoint.Name())
}
// At this point we've either successfully sent the report or encountered a non-transient error.
// In either scenario, the report is removed from the queue and the retry delay is reset.
if poperr := rs.queue.Dequeue(nil); poperr != nil {
// We failed to pop the sent entry off the queue. This isn't recoverable.
panic("RetryingSender.maybeSend: dequeuing from retry queue: " + poperr.Error())
}
rs.lastAttempt = now
rs.delay = 0
}
}
func bounded(val, min, max time.Duration) time.Duration {
if val < min {
return min
}
if val > max {
return max
}
return val
}
func persistenceName(name string) string {
return path.Join(persistPrefix, name)
}