func main()

in cmd/node-termination-handler.go [69:306]


func main() {
	// Zerolog uses json formatting by default, so change that to a human-readable format instead
	log.Logger = log.Output(logging.RoutingLevelWriter{
		Writer:    &zerolog.ConsoleWriter{Out: os.Stdout, TimeFormat: timeFormat, NoColor: true},
		ErrWriter: &zerolog.ConsoleWriter{Out: os.Stderr, TimeFormat: timeFormat, NoColor: true},
	})

	signalChan := make(chan os.Signal, 1)
	signal.Notify(signalChan, syscall.SIGTERM)
	defer signal.Stop(signalChan)

	nthConfig, err := config.ParseCliArgs()
	if err != nil {
		log.Fatal().Err(err).Msg("Failed to parse cli args,")
	}

	if nthConfig.JsonLogging {
		log.Logger = zerolog.New(os.Stderr).With().Timestamp().Logger()
	}
	switch strings.ToLower(nthConfig.LogLevel) {
	case "info":
		zerolog.SetGlobalLevel(zerolog.InfoLevel)
	case "debug":
		zerolog.SetGlobalLevel(zerolog.DebugLevel)
	case "error":
		zerolog.SetGlobalLevel(zerolog.ErrorLevel)
	}

	klog.SetLogger(zerologr.New(&log.Logger))

	log.Info().Msgf("Using log format version %d", nthConfig.LogFormatVersion)
	if err = logging.SetFormatVersion(nthConfig.LogFormatVersion); err != nil {
		log.Warn().Err(err).Send()
	}
	if err = observability.SetReasonForKindVersion(nthConfig.LogFormatVersion); err != nil {
		log.Warn().Err(err).Send()
	}

	err = webhook.ValidateWebhookConfig(nthConfig)
	if err != nil {
		nthConfig.Print()
		log.Fatal().Err(err).Msg("Webhook validation failed,")
	}

	clusterConfig, err := rest.InClusterConfig()
	if err != nil {
		log.Fatal().Err(err).Msgf("retreiving cluster config")
	}
	clientset, err := kubernetes.NewForConfig(clusterConfig)
	if err != nil {
		log.Fatal().Err(err).Msgf("creating new clientset with config: %v", err)
	}
	node, err := node.New(nthConfig, clientset)
	if err != nil {
		nthConfig.Print()
		log.Fatal().Err(err).Msg("Unable to instantiate a node for various kubernetes node functions,")
	}

	metrics, initMetricsErr := observability.InitMetrics(nthConfig.EnablePrometheus, nthConfig.PrometheusPort)
	if initMetricsErr != nil {
		nthConfig.Print()
		log.Fatal().Err(initMetricsErr).Msg("Unable to instantiate observability metrics,")
	}

	err = observability.InitProbes(nthConfig.EnableProbes, nthConfig.ProbesPort, nthConfig.ProbesEndpoint)
	if err != nil {
		nthConfig.Print()
		log.Fatal().Err(err).Msg("Unable to instantiate probes service,")
	}
	imdsDisabled := nthConfig.EnableSQSTerminationDraining

	interruptionEventStore := interruptioneventstore.New(nthConfig)
	var imds *ec2metadata.Service
	var nodeMetadata ec2metadata.NodeMetadata

	if !imdsDisabled {
		imds = ec2metadata.New(nthConfig.MetadataURL, nthConfig.MetadataTries)
		nodeMetadata = imds.GetNodeMetadata()
	}

	// Populate the aws region if available from node metadata and not already explicitly configured
	if nthConfig.AWSRegion == "" && nodeMetadata.Region != "" {
		nthConfig.AWSRegion = nodeMetadata.Region
	} else if nthConfig.AWSRegion == "" && nthConfig.QueueURL != "" {
		nthConfig.AWSRegion = getRegionFromQueueURL(nthConfig.QueueURL)
		log.Debug().Str("Retrieved AWS region from queue-url: \"%s\"", nthConfig.AWSRegion)
	}
	if nthConfig.AWSRegion == "" && nthConfig.EnableSQSTerminationDraining {
		nthConfig.Print()
		log.Fatal().Msgf("Unable to find the AWS region to process queue events.")
	}

	recorder, err := observability.InitK8sEventRecorder(nthConfig.EmitKubernetesEvents, nthConfig.NodeName, nthConfig.EnableSQSTerminationDraining, nodeMetadata, nthConfig.KubernetesEventsExtraAnnotations, clientset)
	if err != nil {
		nthConfig.Print()
		log.Fatal().Err(err).Msg("Unable to create Kubernetes event recorder,")
	}

	nthConfig.Print()

	if !imdsDisabled && nthConfig.EnableScheduledEventDraining {
		//will retry 4 times with an interval of 2 seconds.
		pollCtx, cancelPollCtx := context.WithTimeout(context.Background(), 8*time.Second)
		err = wait.PollUntilContextCancel(pollCtx, 2*time.Second, true, func(context.Context) (done bool, err error) {
			err = handleRebootUncordon(nthConfig.NodeName, interruptionEventStore, *node)
			if err != nil {
				log.Warn().Err(err).Msgf("Unable to complete the uncordon after reboot workflow on startup, retrying")
				return false, nil
			}
			return true, nil
		})
		if err != nil {
			log.Warn().Err(err).Msgf("All retries failed, unable to complete the uncordon after reboot workflow")
		}
		cancelPollCtx()
	}

	interruptionChan := make(chan monitor.InterruptionEvent)
	defer close(interruptionChan)
	cancelChan := make(chan monitor.InterruptionEvent)
	defer close(cancelChan)

	monitoringFns := map[string]monitor.Monitor{}
	if !imdsDisabled {
		if nthConfig.EnableSpotInterruptionDraining {
			imdsSpotMonitor := spotitn.NewSpotInterruptionMonitor(imds, interruptionChan, cancelChan, nthConfig.NodeName)
			monitoringFns[spotITN] = imdsSpotMonitor
		}
		if nthConfig.EnableASGLifecycleDraining {
			asgLifecycleMonitor := asglifecycle.NewASGLifecycleMonitor(imds, interruptionChan, cancelChan, nthConfig.NodeName)
			monitoringFns[asgLifecycle] = asgLifecycleMonitor
		}
		if nthConfig.EnableScheduledEventDraining {
			imdsScheduledEventMonitor := scheduledevent.NewScheduledEventMonitor(imds, interruptionChan, cancelChan, nthConfig.NodeName)
			monitoringFns[scheduledMaintenance] = imdsScheduledEventMonitor
		}
		if nthConfig.EnableRebalanceMonitoring || nthConfig.EnableRebalanceDraining {
			imdsRebalanceMonitor := rebalancerecommendation.NewRebalanceRecommendationMonitor(imds, interruptionChan, nthConfig.NodeName)
			monitoringFns[rebalanceRecommendation] = imdsRebalanceMonitor
		}
	}
	if nthConfig.EnableSQSTerminationDraining {
		cfg := aws.NewConfig().WithRegion(nthConfig.AWSRegion).WithEndpoint(nthConfig.AWSEndpoint).WithSTSRegionalEndpoint(endpoints.RegionalSTSEndpoint)
		sess := session.Must(session.NewSessionWithOptions(session.Options{
			Config:            *cfg,
			SharedConfigState: session.SharedConfigEnable,
		}))
		creds, err := sess.Config.Credentials.Get()
		if err != nil {
			log.Fatal().Err(err).Msg("Unable to get AWS credentials")
		}
		log.Debug().Msgf("AWS Credentials retrieved from provider: %s", creds.ProviderName)

		ec2Client := ec2.New(sess)

		if initMetricsErr == nil && nthConfig.EnablePrometheus {
			go metrics.InitNodeMetrics(nthConfig, node, ec2Client)
		}

		completeLifecycleActionDelay := time.Duration(nthConfig.CompleteLifecycleActionDelaySeconds) * time.Second
		sqsMonitor := sqsevent.SQSMonitor{
			CheckIfManaged:                nthConfig.CheckTagBeforeDraining,
			ManagedTag:                    nthConfig.ManagedTag,
			QueueURL:                      nthConfig.QueueURL,
			InterruptionChan:              interruptionChan,
			CancelChan:                    cancelChan,
			SQS:                           sqsevent.GetSqsClient(sess),
			ASG:                           autoscaling.New(sess),
			EC2:                           ec2Client,
			BeforeCompleteLifecycleAction: func() { <-time.After(completeLifecycleActionDelay) },
		}
		monitoringFns[sqsEvents] = sqsMonitor
	}

	for _, fn := range monitoringFns {
		go func(monitor monitor.Monitor) {
			logging.VersionedMsgs.MonitoringStarted(monitor.Kind())
			var previousErr error
			var duplicateErrCount int
			for range time.Tick(time.Second * 2) {
				err := monitor.Monitor()
				if err != nil {
					logging.VersionedMsgs.ProblemMonitoringForEvents(monitor.Kind(), err)
					metrics.ErrorEventsInc(monitor.Kind())
					recorder.Emit(nthConfig.NodeName, observability.Warning, observability.MonitorErrReason, observability.MonitorErrMsgFmt, monitor.Kind())
					if previousErr != nil && err.Error() == previousErr.Error() {
						duplicateErrCount++
					} else {
						duplicateErrCount = 0
						previousErr = err
					}
					if duplicateErrCount >= duplicateErrThreshold {
						log.Warn().Msg("Stopping NTH - Duplicate Error Threshold hit.")
						panic(fmt.Sprintf("%v", err))
					}
				}
			}
		}(fn)
	}

	go watchForInterruptionEvents(interruptionChan, interruptionEventStore)
	log.Info().Msg("Started watching for interruption events")
	log.Info().Msg("Kubernetes AWS Node Termination Handler has started successfully!")

	go watchForCancellationEvents(cancelChan, interruptionEventStore, node, metrics, recorder)
	log.Info().Msg("Started watching for event cancellations")

	var wg sync.WaitGroup

	asgLaunchHandler := launch.New(interruptionEventStore, *node, nthConfig, metrics, recorder, clientset)
	drainCordonHander := draincordon.New(interruptionEventStore, *node, nthConfig, nodeMetadata, metrics, recorder)

	for range time.NewTicker(1 * time.Second).C {
		select {
		case <-signalChan:
			// Exit interruption loop if a SIGTERM is received or the channel is closed
			break
		default:
		EventLoop:
			for event, ok := interruptionEventStore.GetActiveEvent(); ok; event, ok = interruptionEventStore.GetActiveEvent() {
				select {
				case interruptionEventStore.Workers <- 1:
					logging.VersionedMsgs.RequestingInstanceDrain(event)
					event.InProgress = true
					wg.Add(1)
					recorder.Emit(event.NodeName, observability.Normal, observability.GetReasonForKind(event.Kind, event.Monitor), event.Description)
					go processInterruptionEvent(interruptionEventStore, event, []interruptionEventHandler{asgLaunchHandler, drainCordonHander}, &wg)
				default:
					log.Warn().Msg("all workers busy, waiting")
					break EventLoop
				}
			}
		}
	}
	log.Info().Msg("AWS Node Termination Handler is shutting down")
	wg.Wait()
	log.Debug().Msg("all event processors finished")
}