cmd/aws-k8s-agent/main.go (112 lines of code) (raw):

// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. // // Licensed under the Apache License, Version 2.0 (the "License"). You may // not use this file except in compliance with the License. A copy of the // License is located at // // http://aws.amazon.com/apache2.0/ // // or in the "license" file accompanying this file. This file is distributed // on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either // express or implied. See the License for the specific language governing // permissions and limitations under the License. // The aws-node ipam daemon binary package main import ( "context" "os" "time" "github.com/aws/amazon-vpc-cni-k8s/pkg/ipamd" "github.com/aws/amazon-vpc-cni-k8s/pkg/k8sapi" "github.com/aws/amazon-vpc-cni-k8s/pkg/utils/eventrecorder" "github.com/aws/amazon-vpc-cni-k8s/pkg/utils/logger" "github.com/aws/amazon-vpc-cni-k8s/pkg/version" "github.com/aws/amazon-vpc-cni-k8s/utils" metrics "github.com/aws/amazon-vpc-cni-k8s/utils/prometheusmetrics" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" ) const ( appName = "aws-node" // metricsPort is the port for prometheus metrics metricsPort = 61678 // Environment variable to disable the metrics endpoint on 61678 envDisableMetrics = "DISABLE_METRICS" // Environment variable to disable the IPAMD introspection endpoint on 61679 envDisableIntrospection = "DISABLE_INTROSPECTION" restCfgTimeout = 5 * time.Second pollInterval = 5 * time.Second pollTimeout = 30 * time.Second ) func main() { os.Exit(_main()) } // startBackgroundAPIServerCheck checks API connectivity in the background func startBackgroundAPIServerCheck(ipamContext *ipamd.IPAMContext) { go func() { log := logger.Get() log.Info("Starting background API server connectivity check...") // Create a new client for API server check restCfg, err := k8sapi.GetRestConfig() if err != nil { log.Errorf("Failed to get REST config for background API check: %v", err) return } restCfg.Timeout = restCfgTimeout clientSet, err := kubernetes.NewForConfig(restCfg) if err != nil { log.Errorf("Failed to create k8s client for background API check: %v", err) return } // Keep checking until connection is established wait.PollUntilContextCancel(context.Background(), pollInterval, true, func(ctx context.Context) (bool, error) { version, err := clientSet.Discovery().ServerVersion() if err == nil { log.Infof("API server connectivity established in background! Cluster Version is: %s", version.GitVersion) // Update IPAM context with new API server connectivity ipamContext.SetAPIServerConnectivity(true) // Exit the goroutine after successful connection log.Info("Background API server check completed successfully") return true, nil } log.Debugf("Still waiting for API server connectivity in background: %v", err) return false, nil }) }() } func _main() int { // Do not add anything before initializing logger log := logger.Get() log.Infof("Starting L-IPAMD %s ...", version.Version) version.RegisterMetric() enabledPodEni := ipamd.EnablePodENI() enabledCustomNetwork := ipamd.UseCustomNetworkCfg() enabledPodAnnotation := ipamd.EnablePodIPAnnotation() withApiServer := false // Check API Server Connectivity if enabledPodEni || enabledCustomNetwork || enabledPodAnnotation { log.Info("SGP, custom networking or pod annotation feature is in use, waiting for API server connectivity to start IPAMD") if err := k8sapi.CheckAPIServerConnectivity(); err != nil { log.Errorf("Failed to check API server connectivity: %s", err) return 1 } else { log.Info("API server connectivity established.") withApiServer = true } } else { log.Infof("Waiting to connect API server for upto %s...", pollTimeout) // Try a quick check first if err := k8sapi.CheckAPIServerConnectivityWithTimeout(pollInterval, pollTimeout); err != nil { log.Warn("Proceeding without API server connectivity, will run background API server connectivity check") withApiServer = false } else { log.Info("API server connectivity established.") withApiServer = true } } // Create Kubernetes client for API server requests k8sClient, err := k8sapi.CreateKubeClient(appName) if err != nil { log.Errorf("Failed to create kube client: %s", err) } // Create EventRecorder for use by IPAMD if err := eventrecorder.Init(k8sClient, withApiServer); err != nil { log.Errorf("Failed to create event recorder: %s", err) log.Warn("Skipping event recorder initialization") } ipamContext, err := ipamd.New(k8sClient, withApiServer) if err != nil { log.Errorf("Initialization failure: %v", err) return 1 } // If not connected to API server yet, start background checks if !withApiServer { startBackgroundAPIServerCheck(ipamContext) } // Pool manager go ipamContext.StartNodeIPPoolManager() if !utils.GetBoolAsStringEnvVar(envDisableMetrics, false) { // Prometheus metrics go metrics.ServeMetrics(metricsPort) } // CNI introspection endpoints if !utils.GetBoolAsStringEnvVar(envDisableIntrospection, false) { go ipamContext.ServeIntrospection() } // Start the RPC listener err = ipamContext.RunRPCHandler(version.Version) if err != nil { log.Errorf("Failed to set up gRPC handler: %v", err) return 1 } return 0 }