plugins/outputs/cloudwatchlogs/internal/pusher/retry.go (81 lines of code) (raw):
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT
package pusher
import (
"errors"
"math/rand"
"net"
"strings"
"time"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
)
const (
// baseRetryDelayShort is the base retry delay for short retry strategy
baseRetryDelayShort = 200 * time.Millisecond
// baseRetryDelayLong is the base retry delay for long retry strategy
baseRetryDelayLong = 2 * time.Second
// numBackoffRetriesShort is the maximum number of consecutive retries using the short retry strategy before using
// the maxRetryDelay
numBackoffRetriesShort = 5
// numBackoffRetriesLong is the maximum number of consecutive retries using the long retry strategy before using
// the maxRetryDelay
numBackoffRetriesLong = 2
// maxRetryDelay is the maximum retry delay for the either retry strategy
maxRetryDelay = 1 * time.Minute
)
type retryWaitStrategy int
const (
retryShort retryWaitStrategy = iota
retryLong
)
// retryWaitShort returns a duration to wait before retrying a request using the short retry strategy
func retryWaitShort(retryCount int) time.Duration {
return retryWait(baseRetryDelayShort, numBackoffRetriesShort, retryCount)
}
// retryWaitLong returns a duration to wait before retrying a request using the long retry strategy.
// this strategy is used for errors that should not be retried too quickly
func retryWaitLong(retryCount int) time.Duration {
return retryWait(baseRetryDelayLong, numBackoffRetriesLong, retryCount)
}
func retryWait(baseRetryDelay time.Duration, maxBackoffRetries int, retryCount int) time.Duration {
d := maxRetryDelay
if retryCount < maxBackoffRetries {
d = baseRetryDelay * time.Duration(1<<int64(retryCount))
}
return withJitter(d)
}
func withJitter(d time.Duration) time.Duration {
return time.Duration(rand.Int63n(int64(d/2)) + int64(d/2)) // nolint:gosec
}
// chooseRetryWaitStrategy decides if a "long" or "short" retry strategy should be used when the PutLogEvents API call
// returns an error. A short retry strategy should be used for most errors, while a long retry strategy is used for
// errors where retrying too quickly could cause excessive strain on the backend servers.
//
// Specifically, use the long retry strategy for the following PutLogEvents errors:
// - 500 (InternalFailure)
// - 503 (ServiceUnavailable)
// - Connection Refused
// - Connection Reset by Peer
// - Connection Timeout
// - Throttling
func chooseRetryWaitStrategy(err error) retryWaitStrategy {
if isErrConnectionTimeout(err) || isErrConnectionReset(err) || isErrConnectionRefused(err) || request.IsErrorThrottle(err) {
return retryLong
}
// Check AWS Error codes if available
var awsErr awserr.Error
if errors.As(err, &awsErr) {
switch awsErr.Code() {
case
cloudwatchlogs.ErrCodeServiceUnavailableException,
cloudwatchlogs.ErrCodeThrottlingException,
"RequestTimeout",
request.ErrCodeResponseTimeout:
return retryLong
}
// Check HTTP status codes if available
var requestFailure awserr.RequestFailure
if errors.As(err, &requestFailure) {
switch requestFailure.StatusCode() {
case
500, // internal failure
503: // service unavailable
return retryLong
}
}
}
// Otherwise, default to short retry strategy
return retryShort
}
func isErrConnectionTimeout(err error) bool {
var netErr net.Error
return errors.As(err, &netErr) && netErr.Timeout()
}
func isErrConnectionReset(err error) bool {
errStr := err.Error()
if strings.Contains(errStr, "read: connection reset") {
return false
}
return strings.Contains(errStr, "use of closed network connection") ||
strings.Contains(errStr, "connection reset") ||
strings.Contains(errStr, "broken pipe")
}
func isErrConnectionRefused(err error) bool {
return strings.Contains(err.Error(), "connection refused")
}