in cmd/node-termination-handler.go [57:257]
func main() {
// Zerolog uses json formatting by default, so change that to a human-readable format instead
log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr, TimeFormat: timeFormat, NoColor: true})
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGTERM)
defer signal.Stop(signalChan)
nthConfig, err := config.ParseCliArgs()
if err != nil {
log.Fatal().Err(err).Msg("Failed to parse cli args,")
}
if nthConfig.JsonLogging {
log.Logger = zerolog.New(os.Stderr).With().Timestamp().Logger()
}
switch strings.ToLower(nthConfig.LogLevel) {
case "info":
zerolog.SetGlobalLevel(zerolog.InfoLevel)
case "debug":
zerolog.SetGlobalLevel(zerolog.DebugLevel)
case "error":
zerolog.SetGlobalLevel(zerolog.ErrorLevel)
}
err = webhook.ValidateWebhookConfig(nthConfig)
if err != nil {
nthConfig.Print()
log.Fatal().Err(err).Msg("Webhook validation failed,")
}
node, err := node.New(nthConfig)
if err != nil {
nthConfig.Print()
log.Fatal().Err(err).Msg("Unable to instantiate a node for various kubernetes node functions,")
}
metrics, err := observability.InitMetrics(nthConfig.EnablePrometheus, nthConfig.PrometheusPort)
if err != nil {
nthConfig.Print()
log.Fatal().Err(err).Msg("Unable to instantiate observability metrics,")
}
err = observability.InitProbes(nthConfig.EnableProbes, nthConfig.ProbesPort, nthConfig.ProbesEndpoint)
if err != nil {
nthConfig.Print()
log.Fatal().Err(err).Msg("Unable to instantiate probes service,")
}
imds := ec2metadata.New(nthConfig.MetadataURL, nthConfig.MetadataTries)
interruptionEventStore := interruptioneventstore.New(nthConfig)
nodeMetadata := imds.GetNodeMetadata()
// Populate the aws region if available from node metadata and not already explicitly configured
if nthConfig.AWSRegion == "" && nodeMetadata.Region != "" {
nthConfig.AWSRegion = nodeMetadata.Region
} else if nthConfig.AWSRegion == "" && nthConfig.QueueURL != "" {
nthConfig.AWSRegion = getRegionFromQueueURL(nthConfig.QueueURL)
log.Debug().Str("Retrieved AWS region from queue-url: \"%s\"", nthConfig.AWSRegion)
}
if nthConfig.AWSRegion == "" && nthConfig.EnableSQSTerminationDraining {
nthConfig.Print()
log.Fatal().Msgf("Unable to find the AWS region to process queue events.")
}
recorder, err := observability.InitK8sEventRecorder(nthConfig.EmitKubernetesEvents, nthConfig.NodeName, nthConfig.EnableSQSTerminationDraining, nodeMetadata, nthConfig.KubernetesEventsExtraAnnotations)
if err != nil {
nthConfig.Print()
log.Fatal().Err(err).Msg("Unable to create Kubernetes event recorder,")
}
nthConfig.Print()
if nthConfig.EnableScheduledEventDraining {
stopCh := make(chan struct{})
go func() {
time.Sleep(8 * time.Second)
stopCh <- struct{}{}
}()
//will retry 4 times with an interval of 2 seconds.
err = wait.PollImmediateUntil(2*time.Second, func() (done bool, err error) {
err = handleRebootUncordon(nthConfig.NodeName, interruptionEventStore, *node)
if err != nil {
log.Warn().Err(err).Msgf("Unable to complete the uncordon after reboot workflow on startup, retrying")
}
return false, nil
}, stopCh)
if err != nil {
log.Warn().Err(err).Msgf("All retries failed, unable to complete the uncordon after reboot workflow")
}
}
interruptionChan := make(chan monitor.InterruptionEvent)
defer close(interruptionChan)
cancelChan := make(chan monitor.InterruptionEvent)
defer close(cancelChan)
monitoringFns := map[string]monitor.Monitor{}
if nthConfig.EnableSpotInterruptionDraining {
imdsSpotMonitor := spotitn.NewSpotInterruptionMonitor(imds, interruptionChan, cancelChan, nthConfig.NodeName)
monitoringFns[spotITN] = imdsSpotMonitor
}
if nthConfig.EnableScheduledEventDraining {
imdsScheduledEventMonitor := scheduledevent.NewScheduledEventMonitor(imds, interruptionChan, cancelChan, nthConfig.NodeName)
monitoringFns[scheduledMaintenance] = imdsScheduledEventMonitor
}
if nthConfig.EnableRebalanceMonitoring || nthConfig.EnableRebalanceDraining {
imdsRebalanceMonitor := rebalancerecommendation.NewRebalanceRecommendationMonitor(imds, interruptionChan, nthConfig.NodeName)
monitoringFns[rebalanceRecommendation] = imdsRebalanceMonitor
}
if nthConfig.EnableSQSTerminationDraining {
cfg := aws.NewConfig().WithRegion(nthConfig.AWSRegion).WithEndpoint(nthConfig.AWSEndpoint).WithSTSRegionalEndpoint(endpoints.RegionalSTSEndpoint)
sess := session.Must(session.NewSessionWithOptions(session.Options{
Config: *cfg,
SharedConfigState: session.SharedConfigEnable,
}))
creds, err := sess.Config.Credentials.Get()
if err != nil {
log.Fatal().Err(err).Msg("Unable to get AWS credentials")
}
log.Debug().Msgf("AWS Credentials retrieved from provider: %s", creds.ProviderName)
sqsMonitor := sqsevent.SQSMonitor{
CheckIfManaged: nthConfig.CheckASGTagBeforeDraining,
ManagedAsgTag: nthConfig.ManagedAsgTag,
AssumeAsgTagPropagation: nthConfig.AssumeAsgTagPropagation,
QueueURL: nthConfig.QueueURL,
InterruptionChan: interruptionChan,
CancelChan: cancelChan,
SQS: sqs.New(sess),
ASG: autoscaling.New(sess),
EC2: ec2.New(sess),
}
monitoringFns[sqsEvents] = sqsMonitor
}
for _, fn := range monitoringFns {
go func(monitor monitor.Monitor) {
log.Info().Str("event_type", monitor.Kind()).Msg("Started monitoring for events")
var previousErr error
var duplicateErrCount int
for range time.Tick(time.Second * 2) {
err := monitor.Monitor()
if err != nil {
log.Warn().Str("event_type", monitor.Kind()).Err(err).Msg("There was a problem monitoring for events")
metrics.ErrorEventsInc(monitor.Kind())
recorder.Emit(nthConfig.NodeName, observability.Warning, observability.MonitorErrReason, observability.MonitorErrMsgFmt, monitor.Kind())
if previousErr != nil && err.Error() == previousErr.Error() {
duplicateErrCount++
} else {
duplicateErrCount = 0
previousErr = err
}
if duplicateErrCount >= duplicateErrThreshold {
log.Warn().Msg("Stopping NTH - Duplicate Error Threshold hit.")
panic(fmt.Sprintf("%v", err))
}
}
}
}(fn)
}
go watchForInterruptionEvents(interruptionChan, interruptionEventStore)
log.Info().Msg("Started watching for interruption events")
log.Info().Msg("Kubernetes AWS Node Termination Handler has started successfully!")
go watchForCancellationEvents(cancelChan, interruptionEventStore, node, metrics, recorder)
log.Info().Msg("Started watching for event cancellations")
var wg sync.WaitGroup
for range time.NewTicker(1 * time.Second).C {
select {
case <-signalChan:
// Exit interruption loop if a SIGTERM is received or the channel is closed
break
default:
EventLoop:
for event, ok := interruptionEventStore.GetActiveEvent(); ok; event, ok = interruptionEventStore.GetActiveEvent() {
select {
case interruptionEventStore.Workers <- 1:
log.Info().
Str("event-id", event.EventID).
Str("kind", event.Kind).
Str("node-name", event.NodeName).
Str("instance-id", event.InstanceID).
Msg("Requesting instance drain")
event.InProgress = true
wg.Add(1)
recorder.Emit(event.NodeName, observability.Normal, observability.GetReasonForKind(event.Kind), event.Description)
go drainOrCordonIfNecessary(interruptionEventStore, event, *node, nthConfig, nodeMetadata, metrics, recorder, &wg)
default:
log.Warn().Msg("all workers busy, waiting")
break EventLoop
}
}
}
}
log.Info().Msg("AWS Node Termination Handler is shutting down")
wg.Wait()
log.Debug().Msg("all event processors finished")
}