in pipeline/senders/retry.go [188:234]
func (rs *RetryingSender) maybeSend(now time.Time) {
if now.Before(rs.lastAttempt.Add(rs.delay)) {
// Not time yet.
return
}
for {
entry := &queueEntry{}
if loaderr := rs.queue.Peek(entry); loaderr == persistence.ErrNotFound {
break
} else if loaderr != nil {
// We failed to load from the persistent queue. This isn't recoverable.
panic("RetryingSender.maybeSend: loading from retry queue: " + loaderr.Error())
}
if senderr := rs.endpoint.Send(entry.Report); senderr != nil {
// We've encountered a send error. If the error is considered transient and the entry hasn't
// reached its maximum queue time, we'll leave it in the queue and retry. Otherwise it's
// removed from the queue, logged, and recorded as a failure.
expired := rs.clock.Now().Sub(entry.SendTime) > *maxQueueTime
if !expired && rs.endpoint.IsTransient(senderr) {
// Set next attempt
rs.lastAttempt = now
rs.delay = bounded(rs.delay*2, rs.minDelay, rs.maxDelay)
glog.Warningf("RetryingSender.maybeSend [%[1]T - transient; will retry]: %[1]s", senderr)
break
} else if expired {
glog.Errorf("RetryingSender.maybeSend [%[1]T - retry expired]: %[1]s", senderr)
rs.recorder.SendFailed(entry.Report.Id, rs.endpoint.Name())
} else {
glog.Errorf("RetryingSender.maybeSend [%[1]T - will NOT retry]: %[1]s", senderr)
rs.recorder.SendFailed(entry.Report.Id, rs.endpoint.Name())
}
} else {
// Send was successful.
rs.recorder.SendSucceeded(entry.Report.Id, rs.endpoint.Name())
}
// At this point we've either successfully sent the report or encountered a non-transient error.
// In either scenario, the report is removed from the queue and the retry delay is reset.
if poperr := rs.queue.Dequeue(nil); poperr != nil {
// We failed to pop the sent entry off the queue. This isn't recoverable.
panic("RetryingSender.maybeSend: dequeuing from retry queue: " + poperr.Error())
}
rs.lastAttempt = now
rs.delay = 0
}
}