func()

in pipeline/senders/retry.go [188:234]


func (rs *RetryingSender) maybeSend(now time.Time) {
	if now.Before(rs.lastAttempt.Add(rs.delay)) {
		// Not time yet.
		return
	}
	for {
		entry := &queueEntry{}
		if loaderr := rs.queue.Peek(entry); loaderr == persistence.ErrNotFound {
			break
		} else if loaderr != nil {
			// We failed to load from the persistent queue. This isn't recoverable.
			panic("RetryingSender.maybeSend: loading from retry queue: " + loaderr.Error())
		}
		if senderr := rs.endpoint.Send(entry.Report); senderr != nil {
			// We've encountered a send error. If the error is considered transient and the entry hasn't
			// reached its maximum queue time, we'll leave it in the queue and retry. Otherwise it's
			// removed from the queue, logged, and recorded as a failure.
			expired := rs.clock.Now().Sub(entry.SendTime) > *maxQueueTime
			if !expired && rs.endpoint.IsTransient(senderr) {
				// Set next attempt
				rs.lastAttempt = now
				rs.delay = bounded(rs.delay*2, rs.minDelay, rs.maxDelay)
				glog.Warningf("RetryingSender.maybeSend [%[1]T - transient; will retry]: %[1]s", senderr)
				break
			} else if expired {
				glog.Errorf("RetryingSender.maybeSend [%[1]T - retry expired]: %[1]s", senderr)
				rs.recorder.SendFailed(entry.Report.Id, rs.endpoint.Name())
			} else {
				glog.Errorf("RetryingSender.maybeSend [%[1]T - will NOT retry]: %[1]s", senderr)
				rs.recorder.SendFailed(entry.Report.Id, rs.endpoint.Name())
			}
		} else {
			// Send was successful.
			rs.recorder.SendSucceeded(entry.Report.Id, rs.endpoint.Name())
		}

		// At this point we've either successfully sent the report or encountered a non-transient error.
		// In either scenario, the report is removed from the queue and the retry delay is reset.
		if poperr := rs.queue.Dequeue(nil); poperr != nil {
			// We failed to pop the sent entry off the queue. This isn't recoverable.
			panic("RetryingSender.maybeSend: dequeuing from retry queue: " + poperr.Error())
		}

		rs.lastAttempt = now
		rs.delay = 0
	}
}