in appinsights/inmemorychannel.go [365:440]
func (channel *InMemoryChannel) transmitRetry(items telemetryBufferItems, retry bool, retryTimeout time.Duration) {
payload := items.serialize()
retryTimeRemaining := retryTimeout
for _, wait := range submit_retries {
result, err := channel.transmitter.Transmit(payload, items)
if err == nil && result != nil && result.IsSuccess() {
return
}
if !retry {
diagnosticsWriter.Write("Refusing to retry telemetry submission (retry==false)")
return
}
// Check for success, determine if we need to retry anything
if result != nil {
if result.CanRetry() {
// Filter down to failed items
payload, items = result.GetRetryItems(payload, items)
if len(payload) == 0 || len(items) == 0 {
return
}
} else {
diagnosticsWriter.Write("Cannot retry telemetry submission")
return
}
// Check for throttling
if result.IsThrottled() {
if result.retryAfter != nil {
diagnosticsWriter.Printf("Channel is throttled until %s", *result.retryAfter)
channel.throttle.RetryAfter(*result.retryAfter)
} else {
// TODO: Pick a time
}
}
}
if retryTimeout > 0 {
// We're on a time schedule here. Make sure we don't try longer
// than we have been allowed.
if retryTimeRemaining < wait {
// One more chance left -- we'll wait the max time we can
// and then retry on the way out.
currentClock.Sleep(retryTimeRemaining)
break
} else {
// Still have time left to go through the rest of the regular
// retry schedule
retryTimeRemaining -= wait
}
}
diagnosticsWriter.Printf("Waiting %s to retry submission", wait)
currentClock.Sleep(wait)
// Wait if the channel is throttled and we're not on a schedule
if channel.IsThrottled() && retryTimeout == 0 {
diagnosticsWriter.Printf("Channel is throttled; extending wait time.")
ch := channel.throttle.NotifyWhenReady()
result := <-ch
close(ch)
if !result {
return
}
}
}
// One final try
_, err := channel.transmitter.Transmit(payload, items)
if err != nil {
diagnosticsWriter.Write("Gave up transmitting payload; exhausted retries")
}
}