func main()

in cmd/node-termination-handler.go [57:257]


func main() {
	// Zerolog uses json formatting by default, so change that to a human-readable format instead
	log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr, TimeFormat: timeFormat, NoColor: true})

	signalChan := make(chan os.Signal, 1)
	signal.Notify(signalChan, syscall.SIGTERM)
	defer signal.Stop(signalChan)

	nthConfig, err := config.ParseCliArgs()
	if err != nil {
		log.Fatal().Err(err).Msg("Failed to parse cli args,")
	}

	if nthConfig.JsonLogging {
		log.Logger = zerolog.New(os.Stderr).With().Timestamp().Logger()
	}
	switch strings.ToLower(nthConfig.LogLevel) {
	case "info":
		zerolog.SetGlobalLevel(zerolog.InfoLevel)
	case "debug":
		zerolog.SetGlobalLevel(zerolog.DebugLevel)
	case "error":
		zerolog.SetGlobalLevel(zerolog.ErrorLevel)
	}

	err = webhook.ValidateWebhookConfig(nthConfig)
	if err != nil {
		nthConfig.Print()
		log.Fatal().Err(err).Msg("Webhook validation failed,")
	}
	node, err := node.New(nthConfig)
	if err != nil {
		nthConfig.Print()
		log.Fatal().Err(err).Msg("Unable to instantiate a node for various kubernetes node functions,")
	}

	metrics, err := observability.InitMetrics(nthConfig.EnablePrometheus, nthConfig.PrometheusPort)
	if err != nil {
		nthConfig.Print()
		log.Fatal().Err(err).Msg("Unable to instantiate observability metrics,")
	}

	err = observability.InitProbes(nthConfig.EnableProbes, nthConfig.ProbesPort, nthConfig.ProbesEndpoint)
	if err != nil {
		nthConfig.Print()
		log.Fatal().Err(err).Msg("Unable to instantiate probes service,")
	}

	imds := ec2metadata.New(nthConfig.MetadataURL, nthConfig.MetadataTries)

	interruptionEventStore := interruptioneventstore.New(nthConfig)
	nodeMetadata := imds.GetNodeMetadata()
	// Populate the aws region if available from node metadata and not already explicitly configured
	if nthConfig.AWSRegion == "" && nodeMetadata.Region != "" {
		nthConfig.AWSRegion = nodeMetadata.Region
	} else if nthConfig.AWSRegion == "" && nthConfig.QueueURL != "" {
		nthConfig.AWSRegion = getRegionFromQueueURL(nthConfig.QueueURL)
		log.Debug().Str("Retrieved AWS region from queue-url: \"%s\"", nthConfig.AWSRegion)
	}
	if nthConfig.AWSRegion == "" && nthConfig.EnableSQSTerminationDraining {
		nthConfig.Print()
		log.Fatal().Msgf("Unable to find the AWS region to process queue events.")
	}

	recorder, err := observability.InitK8sEventRecorder(nthConfig.EmitKubernetesEvents, nthConfig.NodeName, nthConfig.EnableSQSTerminationDraining, nodeMetadata, nthConfig.KubernetesEventsExtraAnnotations)
	if err != nil {
		nthConfig.Print()
		log.Fatal().Err(err).Msg("Unable to create Kubernetes event recorder,")
	}

	nthConfig.Print()

	if nthConfig.EnableScheduledEventDraining {
		stopCh := make(chan struct{})
		go func() {
			time.Sleep(8 * time.Second)
			stopCh <- struct{}{}
		}()
		//will retry 4 times with an interval of 2 seconds.
		err = wait.PollImmediateUntil(2*time.Second, func() (done bool, err error) {
			err = handleRebootUncordon(nthConfig.NodeName, interruptionEventStore, *node)
			if err != nil {
				log.Warn().Err(err).Msgf("Unable to complete the uncordon after reboot workflow on startup, retrying")
			}
			return false, nil
		}, stopCh)
		if err != nil {
			log.Warn().Err(err).Msgf("All retries failed, unable to complete the uncordon after reboot workflow")
		}
	}

	interruptionChan := make(chan monitor.InterruptionEvent)
	defer close(interruptionChan)
	cancelChan := make(chan monitor.InterruptionEvent)
	defer close(cancelChan)

	monitoringFns := map[string]monitor.Monitor{}
	if nthConfig.EnableSpotInterruptionDraining {
		imdsSpotMonitor := spotitn.NewSpotInterruptionMonitor(imds, interruptionChan, cancelChan, nthConfig.NodeName)
		monitoringFns[spotITN] = imdsSpotMonitor
	}
	if nthConfig.EnableScheduledEventDraining {
		imdsScheduledEventMonitor := scheduledevent.NewScheduledEventMonitor(imds, interruptionChan, cancelChan, nthConfig.NodeName)
		monitoringFns[scheduledMaintenance] = imdsScheduledEventMonitor
	}
	if nthConfig.EnableRebalanceMonitoring || nthConfig.EnableRebalanceDraining {
		imdsRebalanceMonitor := rebalancerecommendation.NewRebalanceRecommendationMonitor(imds, interruptionChan, nthConfig.NodeName)
		monitoringFns[rebalanceRecommendation] = imdsRebalanceMonitor
	}
	if nthConfig.EnableSQSTerminationDraining {
		cfg := aws.NewConfig().WithRegion(nthConfig.AWSRegion).WithEndpoint(nthConfig.AWSEndpoint).WithSTSRegionalEndpoint(endpoints.RegionalSTSEndpoint)
		sess := session.Must(session.NewSessionWithOptions(session.Options{
			Config:            *cfg,
			SharedConfigState: session.SharedConfigEnable,
		}))
		creds, err := sess.Config.Credentials.Get()
		if err != nil {
			log.Fatal().Err(err).Msg("Unable to get AWS credentials")
		}
		log.Debug().Msgf("AWS Credentials retrieved from provider: %s", creds.ProviderName)

		sqsMonitor := sqsevent.SQSMonitor{
			CheckIfManaged:          nthConfig.CheckASGTagBeforeDraining,
			ManagedAsgTag:           nthConfig.ManagedAsgTag,
			AssumeAsgTagPropagation: nthConfig.AssumeAsgTagPropagation,
			QueueURL:                nthConfig.QueueURL,
			InterruptionChan:        interruptionChan,
			CancelChan:              cancelChan,
			SQS:                     sqs.New(sess),
			ASG:                     autoscaling.New(sess),
			EC2:                     ec2.New(sess),
		}
		monitoringFns[sqsEvents] = sqsMonitor
	}

	for _, fn := range monitoringFns {
		go func(monitor monitor.Monitor) {
			log.Info().Str("event_type", monitor.Kind()).Msg("Started monitoring for events")
			var previousErr error
			var duplicateErrCount int
			for range time.Tick(time.Second * 2) {
				err := monitor.Monitor()
				if err != nil {
					log.Warn().Str("event_type", monitor.Kind()).Err(err).Msg("There was a problem monitoring for events")
					metrics.ErrorEventsInc(monitor.Kind())
					recorder.Emit(nthConfig.NodeName, observability.Warning, observability.MonitorErrReason, observability.MonitorErrMsgFmt, monitor.Kind())
					if previousErr != nil && err.Error() == previousErr.Error() {
						duplicateErrCount++
					} else {
						duplicateErrCount = 0
						previousErr = err
					}
					if duplicateErrCount >= duplicateErrThreshold {
						log.Warn().Msg("Stopping NTH - Duplicate Error Threshold hit.")
						panic(fmt.Sprintf("%v", err))
					}
				}
			}
		}(fn)
	}

	go watchForInterruptionEvents(interruptionChan, interruptionEventStore)
	log.Info().Msg("Started watching for interruption events")
	log.Info().Msg("Kubernetes AWS Node Termination Handler has started successfully!")

	go watchForCancellationEvents(cancelChan, interruptionEventStore, node, metrics, recorder)
	log.Info().Msg("Started watching for event cancellations")

	var wg sync.WaitGroup

	for range time.NewTicker(1 * time.Second).C {
		select {
		case <-signalChan:
			// Exit interruption loop if a SIGTERM is received or the channel is closed
			break
		default:
		EventLoop:
			for event, ok := interruptionEventStore.GetActiveEvent(); ok; event, ok = interruptionEventStore.GetActiveEvent() {
				select {
				case interruptionEventStore.Workers <- 1:
					log.Info().
						Str("event-id", event.EventID).
						Str("kind", event.Kind).
						Str("node-name", event.NodeName).
						Str("instance-id", event.InstanceID).
						Msg("Requesting instance drain")
					event.InProgress = true
					wg.Add(1)
					recorder.Emit(event.NodeName, observability.Normal, observability.GetReasonForKind(event.Kind), event.Description)
					go drainOrCordonIfNecessary(interruptionEventStore, event, *node, nthConfig, nodeMetadata, metrics, recorder, &wg)
				default:
					log.Warn().Msg("all workers busy, waiting")
					break EventLoop
				}
			}
		}
	}
	log.Info().Msg("AWS Node Termination Handler is shutting down")
	wg.Wait()
	log.Debug().Msg("all event processors finished")
}