func()

in pipeline/senders/retry.go [147:185]


func (rs *RetryingSender) run(start time.Time) {
	// Start with an initial call to maybeSend() to start sending any persisted state.
	rs.maybeSend(start)
	for {
		var timer clock.Timer
		if rs.delay == 0 {
			// A delay of 0 means we're not retrying. Disable the retry timer; We'll wakeup when a new
			// report is sent.
			timer = clock.NewStoppedTimer()
		} else {
			// Compute the next retry time, which is the current time + current delay + [0,1000) ms jitter
			now := rs.clock.Now()
			jitter := time.Duration(rand.Int63n(1000)) * time.Millisecond
			nextFire := now.Add(rs.delay - now.Sub(rs.lastAttempt)).Add(jitter)
			timer = rs.clock.NewTimerAt(nextFire)
		}
		select {
		case msg, ok := <-rs.add:
			if ok {
				err := rs.queue.Enqueue(msg.entry)
				if err != nil {
					msg.result <- err
					break
				}

				// Successfully queued the message
				msg.result <- nil
				rs.maybeSend(msg.entry.SendTime)
			} else {
				// Channel was closed.
				rs.wait.Done()
				return
			}
		case now := <-timer.GetC():
			rs.maybeSend(now)
		}
		timer.Stop()
	}
}