retry.go (75 lines of code) (raw):
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
// These functions are based on: https://github.com/googleapis/google-cloud-go/blob/6a9c12a395245d8500c267437c2dfa897049a719/bigquery/storage/managedwriter/retry.go
import (
"errors"
"io"
"math/rand"
"strings"
"sync"
"time"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// This retry predicate is used for higher level retries, enqueing appends onto to a bidi
// Channel and evaluating whether an append should be retried (re-enqueued).
func retryPredicate(err error) (shouldRetry bool) {
if err == nil {
return
}
s, ok := status.FromError(err)
// Non-status based error conditions.
if !ok {
// EOF can happen in the case of connection close.
if errors.Is(err, io.EOF) {
shouldRetry = true
return
}
// All other non-status errors are treated as non-retryable (including context errors).
return
}
switch s.Code() {
case codes.Aborted,
codes.Canceled,
codes.DeadlineExceeded,
codes.FailedPrecondition,
codes.Internal,
codes.Unavailable:
shouldRetry = true
return
case codes.ResourceExhausted:
if strings.HasPrefix(s.Message(), "Exceeds 'AppendRows throughput' quota") {
// Note: internal b/246031522 opened to give this a structured error
// And avoid string parsing. Should be a QuotaFailure or similar.
shouldRetry = true
return
}
}
return
}
// StatelessRetryer is used for backing off within a continuous process, like processing the responses
// From the receive side of the bidi stream. An individual item in that process has a notion of an attempt
// Count, and we use maximum retries as a way of evicting bad items.
type statelessRetryer struct {
mu sync.Mutex // guards r
r *rand.Rand
minBackoff time.Duration
jitter time.Duration
maxAttempts int
}
func newStatelessRetryer(numRetryAttempts int) *statelessRetryer {
return &statelessRetryer{
r: rand.New(rand.NewSource(time.Now().UnixNano())),
minBackoff: 50 * time.Millisecond,
jitter: time.Second,
maxAttempts: numRetryAttempts,
}
}
func (sr *statelessRetryer) pause() time.Duration {
jitter := sr.jitter.Nanoseconds()
if jitter > 0 {
sr.mu.Lock()
jitter = sr.r.Int63n(jitter)
sr.mu.Unlock()
}
pause := sr.minBackoff.Nanoseconds() + jitter
return time.Duration(pause)
}
func (sr *statelessRetryer) Retry(err error, attemptCount int) (time.Duration, bool) {
if attemptCount >= sr.maxAttempts {
return 0, false
}
shouldRetry := retryPredicate(err)
if shouldRetry {
return sr.pause(), true
}
return 0, false
}