core/app/registrar/registrar.go (160 lines of code) (raw):
package registrar
import (
"context"
"math"
"math/rand"
"runtime/debug"
"sync"
"time"
"github.com/aws/amazon-ssm-agent/agent/log"
"github.com/aws/amazon-ssm-agent/common/identity"
agentCtx "github.com/aws/amazon-ssm-agent/core/app/context"
)
const (
minSleepSecondsBeforeRetry = 15
jitterFactor = 0.2
)
func getBackoffRetryJitterSleepDuration(retryCount int) time.Duration {
// sleep for at least 15 seconds
expBackoff := math.Max(minSleepSecondsBeforeRetry, math.Pow(2, float64(retryCount)))
return time.Duration(int(expBackoff)+rand.Intn(int(math.Ceil(expBackoff*jitterFactor)))) * time.Second
}
type IRetryableRegistrar interface {
Start() error
Stop()
GetRegistrationAttemptedChan() chan struct{}
}
type RetryableRegistrar struct {
log log.T
registrationAttemptedChan chan struct{}
stopRegistrarChan chan struct{}
identityRegistrar identity.Registrar
timeAfterFunc func(time.Duration) <-chan time.Time
isRegistrarRunning bool
isRegistrarRunningLock *sync.RWMutex
}
func NewRetryableRegistrar(agentCtx agentCtx.ICoreAgentContext) *RetryableRegistrar {
log := agentCtx.Log().WithContext("[Registrar]")
log.Debug("initializing registrar")
// Cast to innerIdentityGetter interface that defined getInner
innerGetter, ok := castToIdentityInner(agentCtx)
if !ok {
log.Errorf("malformed identity")
return nil
}
var identityRegistrar identity.Registrar
if identityRegistrar, ok = innerGetter.GetInner().(identity.Registrar); !ok {
log.Debug("identity does not leverage auto-registration")
return nil
}
return &RetryableRegistrar{
log: log,
identityRegistrar: identityRegistrar,
registrationAttemptedChan: make(chan struct{}, 1),
stopRegistrarChan: make(chan struct{}),
timeAfterFunc: time.After,
isRegistrarRunning: false,
isRegistrarRunningLock: &sync.RWMutex{},
}
}
var castToIdentityInner = func(agentCtx agentCtx.ICoreAgentContext) (identity.IInnerIdentityGetter, bool) {
innerGetter, ok := agentCtx.Identity().(identity.IInnerIdentityGetter)
return innerGetter, ok
}
var contextWithCancel = context.WithCancel
func (r *RetryableRegistrar) Start() error {
r.log.Info("Starting registrar module")
r.setIsRegistrarRunning(true)
go r.RegisterWithRetry()
return nil
}
func (r *RetryableRegistrar) RegisterWithRetry() {
defer func() {
if err := recover(); err != nil {
r.log.Errorf("registrar panic: %v", err)
r.log.Errorf("Stacktrace:\n%s", debug.Stack())
r.log.Flush()
r.setIsRegistrarRunning(false)
select {
case <-r.registrationAttemptedChan:
//channel open, write to channel to unblock and close
r.registrationAttemptedChan <- struct{}{}
close(r.registrationAttemptedChan)
default:
}
}
}()
retryCount := 0
ctx, cancel := contextWithCancel(context.Background())
defer cancel()
for {
errChan := make(chan error, 1)
go func() {
defer func() {
if err := recover(); err != nil {
r.log.Errorf("identity register panic: %v", err)
r.log.Errorf("Stacktrace:\n%s", debug.Stack())
r.log.Flush()
}
// Close errChan if still open
select {
case <-errChan:
default:
close(errChan)
}
}()
errChan <- r.identityRegistrar.Register(ctx)
}()
select {
case err := <-errChan:
if retryCount == 0 {
r.registrationAttemptedChan <- struct{}{}
close(r.registrationAttemptedChan)
}
if err != nil {
r.log.Errorf("failed to register identity: %v", err)
} else {
r.setIsRegistrarRunning(false)
return
}
case <-r.stopRegistrarChan:
cancel()
r.log.Info("Stopping registrar")
r.setIsRegistrarRunning(false)
r.log.Flush()
return
}
// Default sleep duration for non-aws errors
sleepDuration := getBackoffRetryJitterSleepDuration(retryCount)
// Max retry count is 16, which will sleep for about 18-22 hours
if retryCount < 16 {
retryCount++
}
r.log.Infof("sleeping for %v minutes before retrying registration", sleepDuration.Minutes())
select {
case <-r.stopRegistrarChan:
cancel()
r.log.Info("Stopping registrar")
r.setIsRegistrarRunning(false)
r.log.Flush()
return
case <-r.timeAfterFunc(sleepDuration):
}
}
}
func (r *RetryableRegistrar) setIsRegistrarRunning(isRegistrarRunning bool) {
r.isRegistrarRunningLock.Lock()
defer r.isRegistrarRunningLock.Unlock()
r.isRegistrarRunning = isRegistrarRunning
}
func (r *RetryableRegistrar) getIsRegistrarRunning() bool {
r.isRegistrarRunningLock.RLock()
defer r.isRegistrarRunningLock.RUnlock()
return r.isRegistrarRunning
}
// GetRegistrationAttemptedChan returns a channel that is written to and closed
// after registration is attempted or has succeeded
func (r *RetryableRegistrar) GetRegistrationAttemptedChan() chan struct{} {
return r.registrationAttemptedChan
}
func (r *RetryableRegistrar) Stop() {
if !r.getIsRegistrarRunning() {
r.log.Info("Registrar is already stopped")
r.log.Flush()
return
}
r.log.Info("Sending signal to stop registrar")
r.stopRegistrarChan <- struct{}{}
}