in pipeline/senders/retry.go [147:185]
func (rs *RetryingSender) run(start time.Time) {
// Start with an initial call to maybeSend() to start sending any persisted state.
rs.maybeSend(start)
for {
var timer clock.Timer
if rs.delay == 0 {
// A delay of 0 means we're not retrying. Disable the retry timer; We'll wakeup when a new
// report is sent.
timer = clock.NewStoppedTimer()
} else {
// Compute the next retry time, which is the current time + current delay + [0,1000) ms jitter
now := rs.clock.Now()
jitter := time.Duration(rand.Int63n(1000)) * time.Millisecond
nextFire := now.Add(rs.delay - now.Sub(rs.lastAttempt)).Add(jitter)
timer = rs.clock.NewTimerAt(nextFire)
}
select {
case msg, ok := <-rs.add:
if ok {
err := rs.queue.Enqueue(msg.entry)
if err != nil {
msg.result <- err
break
}
// Successfully queued the message
msg.result <- nil
rs.maybeSend(msg.entry.SendTime)
} else {
// Channel was closed.
rs.wait.Done()
return
}
case now := <-timer.GetC():
rs.maybeSend(now)
}
timer.Stop()
}
}