cns/service/main.go (1,403 lines of code) (raw):

// Copyright 2017 Microsoft. All rights reserved. // MIT License package main import ( "bytes" "context" "encoding/json" "fmt" "io/fs" "net/http" "os" "os/signal" "runtime" "strconv" "strings" "syscall" "time" "github.com/Azure/azure-container-networking/aitelemetry" "github.com/Azure/azure-container-networking/cns" cnsclient "github.com/Azure/azure-container-networking/cns/client" cnscli "github.com/Azure/azure-container-networking/cns/cmd/cli" "github.com/Azure/azure-container-networking/cns/cniconflist" "github.com/Azure/azure-container-networking/cns/cnireconciler" "github.com/Azure/azure-container-networking/cns/common" "github.com/Azure/azure-container-networking/cns/configuration" "github.com/Azure/azure-container-networking/cns/deviceplugin" "github.com/Azure/azure-container-networking/cns/endpointmanager" "github.com/Azure/azure-container-networking/cns/fsnotify" "github.com/Azure/azure-container-networking/cns/grpc" "github.com/Azure/azure-container-networking/cns/healthserver" "github.com/Azure/azure-container-networking/cns/hnsclient" "github.com/Azure/azure-container-networking/cns/imds" "github.com/Azure/azure-container-networking/cns/ipampool" "github.com/Azure/azure-container-networking/cns/ipampool/metrics" ipampoolv2 "github.com/Azure/azure-container-networking/cns/ipampool/v2" cssctrl "github.com/Azure/azure-container-networking/cns/kubecontroller/clustersubnetstate" mtpncctrl "github.com/Azure/azure-container-networking/cns/kubecontroller/multitenantpodnetworkconfig" nncctrl "github.com/Azure/azure-container-networking/cns/kubecontroller/nodenetworkconfig" podctrl "github.com/Azure/azure-container-networking/cns/kubecontroller/pod" "github.com/Azure/azure-container-networking/cns/logger" "github.com/Azure/azure-container-networking/cns/metric" "github.com/Azure/azure-container-networking/cns/middlewares" "github.com/Azure/azure-container-networking/cns/multitenantcontroller" "github.com/Azure/azure-container-networking/cns/multitenantcontroller/multitenantoperator" "github.com/Azure/azure-container-networking/cns/restserver" restserverv2 "github.com/Azure/azure-container-networking/cns/restserver/v2" cnstypes "github.com/Azure/azure-container-networking/cns/types" "github.com/Azure/azure-container-networking/cns/wireserver" acn "github.com/Azure/azure-container-networking/common" "github.com/Azure/azure-container-networking/crd" "github.com/Azure/azure-container-networking/crd/clustersubnetstate" cssv1alpha1 "github.com/Azure/azure-container-networking/crd/clustersubnetstate/api/v1alpha1" "github.com/Azure/azure-container-networking/crd/multitenancy" mtv1alpha1 "github.com/Azure/azure-container-networking/crd/multitenancy/api/v1alpha1" "github.com/Azure/azure-container-networking/crd/nodenetworkconfig" "github.com/Azure/azure-container-networking/crd/nodenetworkconfig/api/v1alpha" acnfs "github.com/Azure/azure-container-networking/internal/fs" "github.com/Azure/azure-container-networking/log" "github.com/Azure/azure-container-networking/nmagent" "github.com/Azure/azure-container-networking/platform" "github.com/Azure/azure-container-networking/processlock" localtls "github.com/Azure/azure-container-networking/server/tls" "github.com/Azure/azure-container-networking/store" "github.com/Azure/azure-container-networking/telemetry" "github.com/avast/retry-go/v4" "github.com/google/go-cmp/cmp" "github.com/pkg/errors" "go.uber.org/zap" "go.uber.org/zap/zapcore" "golang.org/x/time/rate" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" kuberuntime "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/healthz" ctrlzap "sigs.k8s.io/controller-runtime/pkg/log/zap" ctrlmgr "sigs.k8s.io/controller-runtime/pkg/manager" ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/metrics/server" ) const ( // Service name. name = "azure-cns" pluginName = "azure-vnet" endpointStoreName = "azure-endpoints" endpointStoreLocationLinux = "/var/run/azure-cns/" endpointStoreLocationWindows = "/k/azurecns/" defaultCNINetworkConfigFileName = "10-azure.conflist" dncApiVersion = "?api-version=2018-03-01" poolIPAMRefreshRateInMilliseconds = 1000 // 720 * acn.FiveSeconds sec sleeps = 1Hr maxRetryNodeRegister = 720 initCNSInitalDelay = 10 * time.Second // envVarEnableCNIConflistGeneration enables cni conflist generation if set (value doesn't matter) envVarEnableCNIConflistGeneration = "CNS_ENABLE_CNI_CONFLIST_GENERATION" cnsReqTimeout = 15 * time.Second defaultLocalServerIP = "localhost" defaultLocalServerPort = "10090" defaultDevicePluginRetryInterval = 2 * time.Second defaultNodeInfoCRDPollInterval = 5 * time.Second defaultDevicePluginMaxRetryCount = 5 initialVnetNICCount = 0 initialIBNICCount = 0 ) type cniConflistScenario string const ( scenarioV4Overlay cniConflistScenario = "v4overlay" scenarioDualStackOverlay cniConflistScenario = "dualStackOverlay" scenarioOverlay cniConflistScenario = "overlay" scenarioCilium cniConflistScenario = "cilium" scenarioSWIFT cniConflistScenario = "swift" ) var ( rootCtx context.Context rootErrCh chan error z *zap.Logger ) // Version is populated by make during build. var version string // endpointStorePath is used to create the path for EdnpointState file. var endpointStorePath string // Command line arguments for CNS. var args = acn.ArgumentList{ { Name: acn.OptEnvironment, Shorthand: acn.OptEnvironmentAlias, Description: "Set the operating environment", Type: "string", DefaultValue: acn.OptEnvironmentAzure, ValueMap: map[string]interface{}{ acn.OptEnvironmentAzure: 0, acn.OptEnvironmentMAS: 0, acn.OptEnvironmentFileIpam: 0, }, }, { Name: acn.OptAPIServerURL, Shorthand: acn.OptAPIServerURLAlias, Description: "Set the API server URL", Type: "string", DefaultValue: "", }, { Name: acn.OptLogLevel, Shorthand: acn.OptLogLevelAlias, Description: "Set the logging level", Type: "int", DefaultValue: acn.OptLogLevelInfo, ValueMap: map[string]interface{}{ acn.OptLogLevelInfo: log.LevelInfo, acn.OptLogLevelDebug: log.LevelDebug, }, }, { Name: acn.OptLogTarget, Shorthand: acn.OptLogTargetAlias, Description: "Set the logging target", Type: "int", DefaultValue: acn.OptLogTargetFile, ValueMap: map[string]interface{}{ acn.OptLogTargetSyslog: log.TargetSyslog, acn.OptLogTargetStderr: log.TargetStderr, acn.OptLogTargetFile: log.TargetLogfile, acn.OptLogStdout: log.TargetStdout, acn.OptLogMultiWrite: log.TargetStdOutAndLogFile, }, }, { Name: acn.OptLogLocation, Shorthand: acn.OptLogLocationAlias, Description: "Set the directory location where logs will be saved", Type: "string", DefaultValue: "", }, { Name: acn.OptIpamQueryUrl, Shorthand: acn.OptIpamQueryUrlAlias, Description: "Set the IPAM query URL", Type: "string", DefaultValue: "", }, { Name: acn.OptIpamQueryInterval, Shorthand: acn.OptIpamQueryIntervalAlias, Description: "Set the IPAM plugin query interval", Type: "int", DefaultValue: "", }, { Name: acn.OptCnsURL, Shorthand: acn.OptCnsURLAlias, Description: "Set the URL for CNS to listen on", Type: "string", DefaultValue: "", }, { Name: acn.OptCnsPort, Shorthand: acn.OptCnsPortAlias, Description: "Set the URL port for CNS to listen on", Type: "string", DefaultValue: "", }, { Name: acn.OptVersion, Shorthand: acn.OptVersionAlias, Description: "Print version information", Type: "bool", DefaultValue: false, }, { Name: acn.OptStoreFileLocation, Shorthand: acn.OptStoreFileLocationAlias, Description: "Set store file absolute path", Type: "string", DefaultValue: platform.CNMRuntimePath, }, { Name: acn.OptNetPluginPath, Shorthand: acn.OptNetPluginPathAlias, Description: "Set network plugin binary absolute path to parent (of azure-vnet and azure-vnet-ipam)", Type: "string", DefaultValue: platform.K8SCNIRuntimePath, }, { Name: acn.OptNetPluginConfigFile, Shorthand: acn.OptNetPluginConfigFileAlias, Description: "Set network plugin configuration file absolute path", Type: "string", DefaultValue: platform.K8SNetConfigPath + string(os.PathSeparator) + defaultCNINetworkConfigFileName, }, { Name: acn.OptCreateDefaultExtNetworkType, Shorthand: acn.OptCreateDefaultExtNetworkTypeAlias, Description: "Create default external network for windows platform with the specified type (l2bridge)", Type: "string", DefaultValue: "", }, { Name: acn.OptTelemetry, Shorthand: acn.OptTelemetryAlias, Description: "Set to false to disable telemetry. This is deprecated in favor of cns_config.json", Type: "bool", DefaultValue: true, }, { Name: acn.OptHttpConnectionTimeout, Shorthand: acn.OptHttpConnectionTimeoutAlias, Description: "Set HTTP connection timeout in seconds to be used by http client in CNS", Type: "int", DefaultValue: "5", }, { Name: acn.OptHttpResponseHeaderTimeout, Shorthand: acn.OptHttpResponseHeaderTimeoutAlias, Description: "Set HTTP response header timeout in seconds to be used by http client in CNS", Type: "int", DefaultValue: "120", }, { Name: acn.OptPrivateEndpoint, Shorthand: acn.OptPrivateEndpointAlias, Description: "Set private endpoint", Type: "string", DefaultValue: "", }, { Name: acn.OptInfrastructureNetworkID, Shorthand: acn.OptInfrastructureNetworkIDAlias, Description: "Set infrastructure network ID", Type: "string", DefaultValue: "", }, { Name: acn.OptNodeID, Shorthand: acn.OptNodeIDAlias, Description: "Set node name/ID", Type: "string", DefaultValue: "", }, { Name: acn.OptManaged, Shorthand: acn.OptManagedAlias, Description: "Set to true to enable managed mode. This is deprecated in favor of cns_config.json", Type: "bool", DefaultValue: false, }, { Name: acn.OptDebugCmd, Shorthand: acn.OptDebugCmdAlias, Description: "Debug flag to retrieve IPconfigs, available values: assigned, available, all", Type: "string", DefaultValue: "", }, { Name: acn.OptDebugArg, Shorthand: acn.OptDebugArgAlias, Description: "Argument flag to be paired with the 'debugcmd' flag.", Type: "string", DefaultValue: "", }, { Name: acn.OptCNSConfigPath, Shorthand: acn.OptCNSConfigPathAlias, Description: "Path to cns config file", Type: "string", DefaultValue: "", }, { Name: acn.OptTelemetryService, Shorthand: acn.OptTelemetryServiceAlias, Description: "Flag to start telemetry service to receive telemetry events from CNI. Default, disabled.", Type: "bool", DefaultValue: false, }, { Name: acn.OptCNIConflistFilepath, Shorthand: acn.OptCNIConflistFilepathAlias, Description: "Filepath to write CNI conflist when CNI conflist generation is enabled", Type: "string", DefaultValue: "", }, { Name: acn.OptCNIConflistScenario, Shorthand: acn.OptCNIConflistScenarioAlias, Description: "Scenario to generate CNI conflist for", Type: "string", DefaultValue: "", }, } // init() is executed before main() whenever this package is imported // to do pre-run setup of things like exit signal handling and building // the root context. func init() { var cancel context.CancelFunc rootCtx, cancel = context.WithCancel(context.Background()) rootErrCh = make(chan error, 1) sigCh := make(chan os.Signal, 1) signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM) // Fill EndpointStatePath based on the platform if endpointStorePath = os.Getenv("CNSStoreFilePath"); endpointStorePath == "" { if runtime.GOOS == "windows" { endpointStorePath = endpointStoreLocationWindows } else { endpointStorePath = endpointStoreLocationLinux } } go func() { // Wait until receiving a signal. select { case sig := <-sigCh: logger.Errorf("caught exit signal %v, exiting", sig) case err := <-rootErrCh: logger.Errorf("unhandled error %v, exiting", err) } cancel() }() } // Prints description and version information. func printVersion() { fmt.Printf("Azure Container Network Service\n") fmt.Printf("Version %v\n", version) } // NodeInterrogator is functionality necessary to read information about nodes. // It is intended to be strictly read-only. type NodeInterrogator interface { SupportedAPIs(context.Context) ([]string, error) } type httpDoer interface { Do(req *http.Request) (*http.Response, error) } // RegisterNode - Tries to register node with DNC when CNS is started in managed DNC mode func registerNode(ctx context.Context, httpClient httpDoer, httpRestService cns.HTTPService, dncEP, infraVnet, nodeID string, ni NodeInterrogator) error { logger.Printf("[Azure CNS] Registering node %s with Infrastructure Network: %s PrivateEndpoint: %s", nodeID, infraVnet, dncEP) var ( numCPU = runtime.NumCPU() url = fmt.Sprintf(acn.RegisterNodeURLFmt, dncEP, infraVnet, nodeID, dncApiVersion) nodeRegisterRequest cns.NodeRegisterRequest ) nodeRegisterRequest.NumCores = numCPU supportedApis, retErr := ni.SupportedAPIs(context.TODO()) if retErr != nil { return errors.Wrap(retErr, fmt.Sprintf("[Azure CNS] Failed to retrieve SupportedApis from NMagent of node %s with Infrastructure Network: %s PrivateEndpoint: %s", nodeID, infraVnet, dncEP)) } // To avoid any null-pointer de-referencing errors. if supportedApis == nil { supportedApis = []string{} } nodeRegisterRequest.NmAgentSupportedApis = supportedApis // CNS tries to register Node for maximum of an hour. err := retry.Do(func() error { return errors.Wrap(sendRegisterNodeRequest(ctx, httpClient, httpRestService, nodeRegisterRequest, url), "failed to sendRegisterNodeRequest") }, retry.Delay(acn.FiveSeconds), retry.Attempts(maxRetryNodeRegister), retry.DelayType(retry.FixedDelay)) return errors.Wrap(err, fmt.Sprintf("[Azure CNS] Failed to register node %s after maximum reties for an hour with Infrastructure Network: %s PrivateEndpoint: %s", nodeID, infraVnet, dncEP)) } // sendRegisterNodeRequest func helps in registering the node until there is an error. func sendRegisterNodeRequest(ctx context.Context, httpClient httpDoer, httpRestService cns.HTTPService, nodeRegisterRequest cns.NodeRegisterRequest, registerURL string) error { var body bytes.Buffer err := json.NewEncoder(&body).Encode(nodeRegisterRequest) if err != nil { logger.Errorf("Failed to register node while encoding json failed with non-retryable err %v", err) return errors.Wrap(retry.Unrecoverable(err), "failed to sendRegisterNodeRequest") } request, err := http.NewRequestWithContext(ctx, http.MethodPost, registerURL, &body) if err != nil { return errors.Wrap(err, "failed to build request") } request.Header.Set("Content-Type", "application/json") response, err := httpClient.Do(request) if err != nil { return errors.Wrap(err, "http request failed") } defer response.Body.Close() if response.StatusCode != http.StatusOK { err = fmt.Errorf("[Azure CNS] Failed to register node, DNC replied with http status code %s", strconv.Itoa(response.StatusCode)) logger.Errorf(err.Error()) return errors.Wrap(err, "failed to sendRegisterNodeRequest") } var req cns.SetOrchestratorTypeRequest err = json.NewDecoder(response.Body).Decode(&req) if err != nil { logger.Errorf("decoding Node Register response json failed with err %v", err) return errors.Wrap(err, "failed to sendRegisterNodeRequest") } httpRestService.SetNodeOrchestrator(&req) logger.Printf("[Azure CNS] Node Registered") return nil } func startTelemetryService(ctx context.Context) { var config aitelemetry.AIConfig tb := telemetry.NewTelemetryBuffer(nil) err := tb.CreateAITelemetryHandle(config, false, false, false) if err != nil { logger.Errorf("AI telemetry handle creation failed: %v", err) return } tbtemp := telemetry.NewTelemetryBuffer(nil) //nolint:errcheck // best effort to cleanup leaked pipe/socket before start tbtemp.Cleanup(telemetry.FdName) err = tb.StartServer() logger.Printf("Telemetry service for CNI started") if err != nil { logger.Errorf("Telemetry service failed to start: %v", err) return } tb.PushData(ctx) } // Main is the entry point for CNS. func main() { // Initialize and parse command line arguments. acn.ParseArgs(&args, printVersion) cniPath := acn.GetArg(acn.OptNetPluginPath).(string) cniConfigFile := acn.GetArg(acn.OptNetPluginConfigFile).(string) cnsURL := acn.GetArg(acn.OptCnsURL).(string) cnsPort := acn.GetArg(acn.OptCnsPort).(string) logLevel := acn.GetArg(acn.OptLogLevel).(int) logTarget := acn.GetArg(acn.OptLogTarget).(int) logDirectory := acn.GetArg(acn.OptLogLocation).(string) vers := acn.GetArg(acn.OptVersion).(bool) createDefaultExtNetworkType := acn.GetArg(acn.OptCreateDefaultExtNetworkType).(string) telemetryEnabled := acn.GetArg(acn.OptTelemetry).(bool) httpConnectionTimeout := acn.GetArg(acn.OptHttpConnectionTimeout).(int) httpResponseHeaderTimeout := acn.GetArg(acn.OptHttpResponseHeaderTimeout).(int) storeFileLocation := acn.GetArg(acn.OptStoreFileLocation).(string) privateEndpoint := acn.GetArg(acn.OptPrivateEndpoint).(string) infravnet := acn.GetArg(acn.OptInfrastructureNetworkID).(string) nodeID := acn.GetArg(acn.OptNodeID).(string) clientDebugCmd := acn.GetArg(acn.OptDebugCmd).(string) clientDebugArg := acn.GetArg(acn.OptDebugArg).(string) cmdLineConfigPath := acn.GetArg(acn.OptCNSConfigPath).(string) telemetryDaemonEnabled := acn.GetArg(acn.OptTelemetryService).(bool) cniConflistFilepathArg := acn.GetArg(acn.OptCNIConflistFilepath).(string) cniConflistScenarioArg := acn.GetArg(acn.OptCNIConflistScenario).(string) if vers { printVersion() os.Exit(0) } // Initialize CNS. var ( err error config common.ServiceConfig endpointStateStore store.KeyValueStore ) config.Version = version config.Name = name // Create a channel to receive unhandled errors from CNS. config.ErrChan = rootErrCh // Create logging provider. logger.InitLogger(name, logLevel, logTarget, logDirectory) if clientDebugCmd != "" { err := cnscli.HandleCNSClientCommands(rootCtx, clientDebugCmd, clientDebugArg) if err != nil { fmt.Println(err) os.Exit(1) } os.Exit(0) } if !telemetryEnabled { logger.Errorf("[Azure CNS] Cannot disable telemetry via cmdline. Update cns_config.json to disable telemetry.") } cnsconfig, err := configuration.ReadConfig(cmdLineConfigPath) if err != nil { if errors.Is(err, fs.ErrNotExist) { logger.Warnf("config file does not exist, using default") cnsconfig = &configuration.CNSConfig{} } else { logger.Errorf("fatal: failed to read cns config: %v", err) os.Exit(1) } } configuration.SetCNSConfigDefaults(cnsconfig) disableTelemetry := cnsconfig.TelemetrySettings.DisableAll if !disableTelemetry { ts := cnsconfig.TelemetrySettings aiConfig := aitelemetry.AIConfig{ AppName: name, AppVersion: version, BatchSize: ts.TelemetryBatchSizeBytes, BatchInterval: ts.TelemetryBatchIntervalInSecs, RefreshTimeout: ts.RefreshIntervalInSecs, DisableMetadataRefreshThread: ts.DisableMetadataRefreshThread, DebugMode: ts.DebugMode, } if aiKey := cnsconfig.TelemetrySettings.AppInsightsInstrumentationKey; aiKey != "" { logger.InitAIWithIKey(aiConfig, aiKey, ts.DisableTrace, ts.DisableMetric, ts.DisableEvent) } else { logger.InitAI(aiConfig, ts.DisableTrace, ts.DisableMetric, ts.DisableEvent) } if cnsconfig.TelemetrySettings.ConfigSnapshotIntervalInMins > 0 { go metric.SendCNSConfigSnapshot(rootCtx, cnsconfig) } } logger.Printf("[Azure CNS] Using config: %+v", cnsconfig) _, envEnableConflistGeneration := os.LookupEnv(envVarEnableCNIConflistGeneration) var conflistGenerator restserver.CNIConflistGenerator if cnsconfig.EnableCNIConflistGeneration || envEnableConflistGeneration { conflistFilepath := cnsconfig.CNIConflistFilepath if cniConflistFilepathArg != "" { // allow the filepath to get overidden by command line arg conflistFilepath = cniConflistFilepathArg } writer, newWriterErr := acnfs.NewAtomicWriter(conflistFilepath) if newWriterErr != nil { logger.Errorf("unable to create atomic writer to generate cni conflist: %v", newWriterErr) os.Exit(1) } // allow the scenario to get overridden by command line arg scenarioString := cnsconfig.CNIConflistScenario if cniConflistScenarioArg != "" { scenarioString = cniConflistScenarioArg } switch scenario := cniConflistScenario(scenarioString); scenario { case scenarioV4Overlay: conflistGenerator = &cniconflist.V4OverlayGenerator{Writer: writer} case scenarioDualStackOverlay: conflistGenerator = &cniconflist.DualStackOverlayGenerator{Writer: writer} case scenarioOverlay: conflistGenerator = &cniconflist.OverlayGenerator{Writer: writer} case scenarioCilium: conflistGenerator = &cniconflist.CiliumGenerator{Writer: writer} case scenarioSWIFT: conflistGenerator = &cniconflist.SWIFTGenerator{Writer: writer} default: logger.Errorf("unable to generate cni conflist for unknown scenario: %s", scenario) os.Exit(1) } } // configure zap logger zconfig := zap.NewProductionConfig() zconfig.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder if z, err = zconfig.Build(); err != nil { fmt.Printf("failed to create logger: %v", err) os.Exit(1) } // start the healthz/readyz/metrics server readyCh := make(chan any) readyChecker := healthz.CheckHandler{ Checker: healthz.Checker(func(*http.Request) error { select { default: return errors.New("not ready") case <-readyCh: } return nil }), } healthzHandler, err := healthserver.NewHealthzHandlerWithChecks(&healthserver.Config{PingAPIServer: cnsconfig.EnableAPIServerHealthPing}) if err != nil { logger.Errorf("unable to initialize a healthz handler: %v", err) return } go healthserver.Start(z, cnsconfig.MetricsBindAddress, healthzHandler, readyChecker) nmaConfig, err := nmagent.NewConfig(cnsconfig.WireserverIP) if err != nil { logger.Errorf("[Azure CNS] Failed to produce NMAgent config from the supplied wireserver ip: %v", err) return } nmaClient, err := nmagent.NewClient(nmaConfig) if err != nil { logger.Errorf("[Azure CNS] Failed to start nmagent client due to error: %v", err) return } // copy ChannelMode from cnsconfig to HTTPRemoteRestService config config.ChannelMode = cnsconfig.ChannelMode if cnsconfig.ChannelMode == cns.Managed { privateEndpoint = cnsconfig.ManagedSettings.PrivateEndpoint infravnet = cnsconfig.ManagedSettings.InfrastructureNetworkID nodeID = cnsconfig.ManagedSettings.NodeID } if isManaged, ok := acn.GetArg(acn.OptManaged).(bool); ok && isManaged { config.ChannelMode = cns.Managed } homeAzMonitor := restserver.NewHomeAzMonitor(nmaClient, time.Duration(cnsconfig.AZRSettings.PopulateHomeAzCacheRetryIntervalSecs)*time.Second) // homeAz monitor is only required when there is a direct channel between DNC and CNS. // This will prevent the monitor from unnecessarily calling NMA APIs for other scenarios such as AKS-swift, swiftv2 if cnsconfig.ChannelMode == cns.Direct { homeAzMonitor.Start() defer homeAzMonitor.Stop() } if telemetryDaemonEnabled { logger.Printf("CNI Telemetry is enabled") go startTelemetryService(rootCtx) } // Log platform information. logger.Printf("Running on %v", platform.GetOSInfo()) err = platform.CreateDirectory(storeFileLocation) if err != nil { logger.Errorf("Failed to create File Store directory %s, due to Error:%v", storeFileLocation, err.Error()) return } lockclient, err := processlock.NewFileLock(platform.CNILockPath + name + store.LockExtension) if err != nil { logger.Printf("Error initializing file lock:%v", err) return } // Create the key value store. storeFileName := storeFileLocation + name + ".json" config.Store, err = store.NewJsonFileStore(storeFileName, lockclient, nil) if err != nil { logger.Errorf("Failed to create store file: %s, due to error %v\n", storeFileName, err) return } // Initialize endpoint state store if cns is managing endpoint state. if cnsconfig.ManageEndpointState { logger.Printf("[Azure CNS] Configured to manage endpoints state") endpointStoreLock, err := processlock.NewFileLock(platform.CNILockPath + endpointStoreName + store.LockExtension) // nolint if err != nil { logger.Printf("Error initializing endpoint state file lock:%v", err) return } defer endpointStoreLock.Unlock() // nolint err = platform.CreateDirectory(endpointStorePath) if err != nil { logger.Errorf("Failed to create File Store directory %s, due to Error:%v", storeFileLocation, err.Error()) return } // Create the key value store. storeFileName := endpointStorePath + endpointStoreName + ".json" logger.Printf("EndpointStoreState path is %s", storeFileName) endpointStateStore, err = store.NewJsonFileStore(storeFileName, endpointStoreLock, nil) if err != nil { logger.Errorf("Failed to create endpoint state store file: %s, due to error %v\n", storeFileName, err) return } } wsProxy := wireserver.Proxy{ Host: cnsconfig.WireserverIP, HTTPClient: &http.Client{}, } wsclient := &wireserver.Client{ HostPort: cnsconfig.WireserverIP, HTTPClient: &http.Client{}, Logger: logger.Log, } imdsClient := imds.NewClient() httpRemoteRestService, err := restserver.NewHTTPRestService(&config, wsclient, &wsProxy, &restserver.IPtablesProvider{}, nmaClient, endpointStateStore, conflistGenerator, homeAzMonitor, imdsClient) if err != nil { logger.Errorf("Failed to create CNS object, err:%v.\n", err) return } // Set CNS options. httpRemoteRestService.SetOption(acn.OptCnsURL, cnsURL) httpRemoteRestService.SetOption(acn.OptCnsPort, cnsPort) httpRemoteRestService.SetOption(acn.OptNetPluginPath, cniPath) httpRemoteRestService.SetOption(acn.OptNetPluginConfigFile, cniConfigFile) httpRemoteRestService.SetOption(acn.OptCreateDefaultExtNetworkType, createDefaultExtNetworkType) httpRemoteRestService.SetOption(acn.OptHttpConnectionTimeout, httpConnectionTimeout) httpRemoteRestService.SetOption(acn.OptHttpResponseHeaderTimeout, httpResponseHeaderTimeout) httpRemoteRestService.SetOption(acn.OptProgramSNATIPTables, cnsconfig.ProgramSNATIPTables) httpRemoteRestService.SetOption(acn.OptManageEndpointState, cnsconfig.ManageEndpointState) // Create default ext network if commandline option is set if len(strings.TrimSpace(createDefaultExtNetworkType)) > 0 { if err := hnsclient.CreateDefaultExtNetwork(createDefaultExtNetworkType); err == nil { logger.Printf("[Azure CNS] Successfully created default ext network") } else { logger.Printf("[Azure CNS] Failed to create default ext network due to error: %v", err) return } } logger.Printf("[Azure CNS] Initialize HTTPRemoteRestService") if httpRemoteRestService != nil { if cnsconfig.UseHTTPS { config.TLSSettings = localtls.TlsSettings{ TLSSubjectName: cnsconfig.TLSSubjectName, TLSCertificatePath: cnsconfig.TLSCertificatePath, TLSPort: cnsconfig.TLSPort, KeyVaultURL: cnsconfig.KeyVaultSettings.URL, KeyVaultCertificateName: cnsconfig.KeyVaultSettings.CertificateName, MSIResourceID: cnsconfig.MSISettings.ResourceID, KeyVaultCertificateRefreshInterval: time.Duration(cnsconfig.KeyVaultSettings.RefreshIntervalInHrs) * time.Hour, UseMTLS: cnsconfig.UseMTLS, MinTLSVersion: cnsconfig.MinTLSVersion, } } err = httpRemoteRestService.Init(&config) if err != nil { logger.Errorf("Failed to init HTTPService, err:%v.\n", err) return } } // Setting the remote ARP MAC address to 12-34-56-78-9a-bc on windows for external traffic if HNS is enabled err = platform.SetSdnRemoteArpMacAddress(rootCtx) if err != nil { logger.Errorf("Failed to set remote ARP MAC address: %v", err) return } // We are only setting the PriorityVLANTag in 'cns.Direct' mode, because it neatly maps today, to 'isUsingMultitenancy' // In the future, we would want to have a better CNS flag, to explicitly say, this CNS is using multitenancy if cnsconfig.ChannelMode == cns.Direct { // Set Mellanox adapter's PriorityVLANTag value to 3 if adapter exists // reg key value for PriorityVLANTag = 3 --> Packet priority and VLAN enabled // for more details goto https://docs.nvidia.com/networking/display/winof2v230/Configuring+the+Driver+Registry+Keys#ConfiguringtheDriverRegistryKeys-GeneralRegistryKeysGeneralRegistryKeys if platform.HasMellanoxAdapter() { go platform.MonitorAndSetMellanoxRegKeyPriorityVLANTag(rootCtx, cnsconfig.MellanoxMonitorIntervalSecs) } // if swiftv2 scenario is enabled, we need to initialize the ServiceFabric(standalone) swiftv2 middleware to process IPConfigsRequests // RequestIPConfigs() will be invoked only for swiftv2 or swiftv1 k8s scenarios. For swiftv1 direct mode different api will be invoked. // So initializing this middleware always under direct mode should not affect any other scenarios swiftV2Middleware := &middlewares.StandaloneSWIFTv2Middleware{} httpRemoteRestService.AttachIPConfigsHandlerMiddleware(swiftV2Middleware) } // Initialze state in if CNS is running in CRD mode // State must be initialized before we start HTTPRestService if config.ChannelMode == cns.CRD { // Add APIServer FQDN to Log metadata logger.Log.SetAPIServer(os.Getenv("KUBERNETES_SERVICE_HOST")) // Check the CNI statefile mount, and if the file is empty // stub an empty JSON object if err := cnireconciler.WriteObjectToCNIStatefile(); err != nil { logger.Errorf("Failed to write empty object to CNI state: %v", err) return } // We might be configured to reinitialize state from the CNI instead of the apiserver. // If so, we should check that the CNI is new enough to support the state commands, // otherwise we fall back to the existing behavior. if cnsconfig.InitializeFromCNI { var isGoodVer bool isGoodVer, err = cnireconciler.IsDumpStateVer() if err != nil { logger.Errorf("error checking CNI ver: %v", err) } // override the prior config flag with the result of the ver check. cnsconfig.InitializeFromCNI = isGoodVer if cnsconfig.InitializeFromCNI { // Set the PodInfoVersion by initialization type, so that the // PodInfo maps use the correct key schema cns.GlobalPodInfoScheme = cns.InterfaceIDPodInfoScheme } } // If cns manageendpointstate is true, then cns maintains its own state and reconciles from it. // in this case, cns maintains state with containerid as key and so in-memory cache can lookup // and update based on container id. if cnsconfig.ManageEndpointState { cns.GlobalPodInfoScheme = cns.InfraIDPodInfoScheme } logger.Printf("Set GlobalPodInfoScheme %v (InitializeFromCNI=%t)", cns.GlobalPodInfoScheme, cnsconfig.InitializeFromCNI) err = InitializeCRDState(rootCtx, httpRemoteRestService, cnsconfig) if err != nil { logger.Errorf("Failed to start CRD Controller, err:%v.\n", err) return } if cnsconfig.EnableSwiftV2 { // No-op for linux, mapping is set for windows in aks swiftv2 scenario logger.Printf("Fetching backend nics for the node") if err = httpRemoteRestService.SavePnpIDMacaddressMapping(rootCtx); err != nil { logger.Errorf("Failed to fetch PnpIDMacaddress mapping: %v", err) } // No-op for linux, setting primary macaddress if VF is enabled on the nics for aks swiftv2 windows logger.Printf("Setting VF for accelnet nics if feature is enabled (only on windows VM & swiftV2 scenario)") if err = httpRemoteRestService.SetVFForAccelnetNICs(); err != nil { logger.Errorf("Failed to set VF for accelnet NICs: %v", err) } } } // AzureHost channelmode indicates Nodesubnet. IPs are to be fetched from NMagent. if config.ChannelMode == cns.AzureHost { if !cnsconfig.ManageEndpointState { logger.Errorf("ManageEndpointState must be set to true for AzureHost mode") return } // If cns manageendpointstate is true, then cns maintains its own state and reconciles from it. // in this case, cns maintains state with containerid as key and so in-memory cache can lookup // and update based on container id. cns.GlobalPodInfoScheme = cns.InfraIDPodInfoScheme var podInfoByIPProvider cns.PodInfoByIPProvider podInfoByIPProvider, err = getPodInfoByIPProvider(rootCtx, cnsconfig, httpRemoteRestService, nil, "") if err != nil { logger.Errorf("[Azure CNS] Failed to get PodInfoByIPProvider: %v", err) return } err = httpRemoteRestService.InitializeNodeSubnet(rootCtx, podInfoByIPProvider) if err != nil { logger.Errorf("[Azure CNS] Failed to initialize node subnet: %v", err) return } } // Initialize multi-tenant controller if the CNS is running in MultiTenantCRD mode. // It must be started before we start HTTPRemoteRestService. if config.ChannelMode == cns.MultiTenantCRD { err = InitializeMultiTenantController(rootCtx, httpRemoteRestService, *cnsconfig) if err != nil { logger.Errorf("Failed to start multiTenantController, err:%v.\n", err) return } } if cnsconfig.EnableSwiftV2 && cnsconfig.EnableK8sDevicePlugin { // Create device plugin manager instance pluginManager := deviceplugin.NewPluginManager(z) pluginManager.AddPlugin(mtv1alpha1.DeviceTypeVnetNIC, initialVnetNICCount) pluginManager.AddPlugin(mtv1alpha1.DeviceTypeInfiniBandNIC, initialIBNICCount) ctx, cancel := context.WithCancel(context.Background()) defer cancel() // Start device plugin manager in a separate goroutine go func() { retryCount := 0 ticker := time.NewTicker(defaultDevicePluginRetryInterval) // Ensure the ticker is stopped on exit defer ticker.Stop() for { select { case <-ctx.Done(): z.Info("Context canceled, stopping plugin manager") return case <-ticker.C: if pluginErr := pluginManager.Run(ctx); pluginErr != nil { z.Error("plugin manager exited with error", zap.Error(pluginErr)) retryCount++ // Implementing a basic circuit breaker if retryCount >= defaultDevicePluginMaxRetryCount { z.Error("Max retries reached, stopping plugin manager") return } } else { return } } } }() // go routine to poll node info crd and update device counts go func() { if pollErr := pollNodeInfoCRDAndUpdatePlugin(ctx, z, pluginManager); pollErr != nil { z.Error("Error in pollNodeInfoCRDAndUpdatePlugin", zap.Error(pollErr)) } }() } // Conditionally initialize and start the gRPC server if cnsconfig.GRPCSettings.Enable { // Define gRPC server settings settings := grpc.ServerSettings{ IPAddress: cnsconfig.GRPCSettings.IPAddress, Port: cnsconfig.GRPCSettings.Port, } // Initialize CNS service cnsService := &grpc.CNS{Logger: z} // Create a new gRPC server server, grpcErr := grpc.NewServer(settings, cnsService, z) if grpcErr != nil { logger.Errorf("[Listener] Could not initialize gRPC server: %v", grpcErr) return } // Start the gRPC server go func() { if grpcErr := server.Start(); grpcErr != nil { logger.Errorf("[Listener] Could not start gRPC server: %v", grpcErr) return } }() } // if user provides cns url by -c option, then only start HTTP remote server using this url logger.Printf("[Azure CNS] Start HTTP Remote server") if httpRemoteRestService != nil { if cnsconfig.EnablePprof { httpRemoteRestService.RegisterPProfEndpoints() } err = httpRemoteRestService.Start(&config) if err != nil { logger.Errorf("Failed to start CNS, err:%v.\n", err) return } } // if user does not provide cns url by -c option, then start http local server // TODO: we will deprecated -c option in next phase and start local server in any case if config.Server.EnableLocalServer { logger.Printf("[Azure CNS] Start HTTP local server") var localServerURL string if config.Server.Port != "" { localServerURL = fmt.Sprintf(defaultLocalServerIP + ":" + config.Server.Port) } else { localServerURL = fmt.Sprintf(defaultLocalServerIP + ":" + defaultLocalServerPort) } httpLocalRestService := restserverv2.New(httpRemoteRestService) if httpLocalRestService != nil { go func() { err = httpLocalRestService.Start(rootCtx, localServerURL) if err != nil { logger.Errorf("Failed to start local echo server, err:%v.\n", err) return } }() } } if cnsconfig.EnableAsyncPodDelete { // Start fs watcher here z.Info("AsyncPodDelete is enabled") logger.Printf("AsyncPodDelete is enabled") cnsclient, err := cnsclient.New("", cnsReqTimeout) // nolint if err != nil { z.Error("failed to create cnsclient", zap.Error(err)) } go func() { _ = retry.Do(func() error { z.Info("starting fsnotify watcher to process missed Pod deletes") logger.Printf("starting fsnotify watcher to process missed Pod deletes") var endpointCleanup fsnotify.ReleaseIPsClient = cnsclient // using endpointmanager implmentation for stateless CNI sceanrio to remove HNS endpoint alongside the IPs if cnsconfig.IsStalessCNIWindows() { endpointCleanup = endpointmanager.WithPlatformReleaseIPsManager(cnsclient) } w, err := fsnotify.New(endpointCleanup, cnsconfig.AsyncPodDeletePath, z) if err != nil { z.Error("failed to create fsnotify watcher", zap.Error(err)) return errors.Wrap(err, "failed to create fsnotify watcher, will retry") } if err := w.Start(rootCtx); err != nil { z.Error("failed to start fsnotify watcher, will retry", zap.Error(err)) return errors.Wrap(err, "failed to start fsnotify watcher, will retry") } return nil }, retry.DelayType(retry.BackOffDelay), retry.Attempts(0), retry.Context(rootCtx)) // infinite cancellable exponential backoff retrier }() } if !disableTelemetry { go metric.SendHeartBeat(rootCtx, time.Minute*time.Duration(cnsconfig.TelemetrySettings.HeartBeatIntervalInMins), homeAzMonitor, cnsconfig.ChannelMode) go httpRemoteRestService.SendNCSnapShotPeriodically(rootCtx, cnsconfig.TelemetrySettings.SnapshotIntervalInMins) } // If CNS is running on managed DNC mode if config.ChannelMode == cns.Managed { if privateEndpoint == "" || infravnet == "" || nodeID == "" { logger.Errorf("[Azure CNS] Missing required values to run in managed mode: PrivateEndpoint: %s InfrastructureNetworkID: %s NodeID: %s", privateEndpoint, infravnet, nodeID) return } httpRemoteRestService.SetOption(acn.OptPrivateEndpoint, privateEndpoint) httpRemoteRestService.SetOption(acn.OptInfrastructureNetworkID, infravnet) httpRemoteRestService.SetOption(acn.OptNodeID, nodeID) // Passing in the default http client that already implements Do function standardClient := http.DefaultClient registerErr := registerNode(rootCtx, standardClient, httpRemoteRestService, privateEndpoint, infravnet, nodeID, nmaClient) if registerErr != nil { logger.Errorf("[Azure CNS] Registering Node failed with error: %v PrivateEndpoint: %s InfrastructureNetworkID: %s NodeID: %s", registerErr, privateEndpoint, infravnet, nodeID) return } go func(ep, vnet, node string) { // Periodically poll DNC for node updates tickerChannel := time.Tick(time.Duration(cnsconfig.ManagedSettings.NodeSyncIntervalInSeconds) * time.Second) for { <-tickerChannel httpRemoteRestService.SyncNodeStatus(ep, vnet, node, json.RawMessage{}) } }(privateEndpoint, infravnet, nodeID) } if config.ChannelMode == cns.AzureHost { // at this point, rest service is running. We can now start serving new requests. So call StartNodeSubnet, which // will fetch secondary IPs and generate conflist. Do not move this all before rest service start - this will cause // CNI to start sending requests, and if the service doesn't start successfully, the requests will fail. httpRemoteRestService.StartNodeSubnet(rootCtx) } // mark the service as "ready" close(readyCh) // block until process exiting <-rootCtx.Done() if len(strings.TrimSpace(createDefaultExtNetworkType)) > 0 { if err := hnsclient.DeleteDefaultExtNetwork(); err == nil { logger.Printf("[Azure CNS] Successfully deleted default ext network") } else { logger.Printf("[Azure CNS] Failed to delete default ext network due to error: %v", err) } } logger.Printf("stop cns service") // Cleanup. if httpRemoteRestService != nil { httpRemoteRestService.Stop() } if err = lockclient.Unlock(); err != nil { logger.Errorf("lockclient cns unlock error:%v", err) } logger.Printf("CNS exited") logger.Close() } // Poll CRD until it's set and update PluginManager func pollNodeInfoCRDAndUpdatePlugin(ctx context.Context, zlog *zap.Logger, pluginManager *deviceplugin.PluginManager) error { kubeConfig, err := ctrl.GetConfig() if err != nil { logger.Errorf("Failed to get kubeconfig for request controller: %v", err) return errors.Wrap(err, "failed to get kubeconfig") } kubeConfig.UserAgent = "azure-cns-" + version clientset, err := kubernetes.NewForConfig(kubeConfig) if err != nil { return errors.Wrap(err, "failed to build clientset") } nodeName, err := configuration.NodeName() if err != nil { return errors.Wrap(err, "failed to get NodeName") } node, err := clientset.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{}) if err != nil { return errors.Wrapf(err, "failed to get node %s", nodeName) } // check the Node labels for Swift V2 if _, ok := node.Labels[configuration.LabelNodeSwiftV2]; !ok { zlog.Info("Node is not labeled for Swift V2, skipping polling nodeinfo crd") return nil } directcli, err := client.New(kubeConfig, client.Options{Scheme: multitenancy.Scheme}) if err != nil { return errors.Wrap(err, "failed to create ctrl client") } nodeInfoCli := multitenancy.NodeInfoClient{ Cli: directcli, } ticker := time.NewTicker(defaultNodeInfoCRDPollInterval) defer ticker.Stop() for { select { case <-ctx.Done(): zlog.Info("Polling context canceled, exiting") return nil case <-ticker.C: // Fetch the CRD status nodeInfo, err := nodeInfoCli.Get(ctx, node.Name) if err != nil { zlog.Error("Error fetching nodeinfo CRD", zap.Error(err)) return errors.Wrap(err, "failed to get nodeinfo crd") } // Check if the status is set if !cmp.Equal(nodeInfo.Status, mtv1alpha1.NodeInfoStatus{}) && len(nodeInfo.Status.DeviceInfos) > 0 { // Create a map to count devices by type deviceCounts := map[mtv1alpha1.DeviceType]int{ mtv1alpha1.DeviceTypeVnetNIC: 0, mtv1alpha1.DeviceTypeInfiniBandNIC: 0, } // Aggregate device counts from the CRD for _, deviceInfo := range nodeInfo.Status.DeviceInfos { switch deviceInfo.DeviceType { case mtv1alpha1.DeviceTypeVnetNIC, mtv1alpha1.DeviceTypeInfiniBandNIC: deviceCounts[deviceInfo.DeviceType]++ default: zlog.Error("Unknown device type", zap.String("deviceType", string(deviceInfo.DeviceType))) } } // Update the plugin manager with device counts for deviceType, count := range deviceCounts { pluginManager.TrackDevices(deviceType, count) } // Exit polling loop once the CRD status is successfully processed return nil } } } } func InitializeMultiTenantController(ctx context.Context, httpRestService cns.HTTPService, cnsconfig configuration.CNSConfig) error { var multiTenantController multitenantcontroller.RequestController kubeConfig, err := ctrl.GetConfig() kubeConfig.UserAgent = fmt.Sprintf("azure-cns-%s", version) if err != nil { return err } // convert interface type to implementation type httpRestServiceImpl, ok := httpRestService.(*restserver.HTTPRestService) if !ok { logger.Errorf("Failed to convert interface httpRestService to implementation: %v", httpRestService) return fmt.Errorf("Failed to convert interface httpRestService to implementation: %v", httpRestService) } // Set orchestrator type orchestrator := cns.SetOrchestratorTypeRequest{ OrchestratorType: cns.Kubernetes, } httpRestServiceImpl.SetNodeOrchestrator(&orchestrator) // Create multiTenantController. multiTenantController, err = multitenantoperator.New(httpRestServiceImpl, kubeConfig) if err != nil { logger.Errorf("Failed to create multiTenantController:%v", err) return err } // Wait for multiTenantController to start. go func() { for { if err := multiTenantController.Start(ctx); err != nil { logger.Errorf("Failed to start multiTenantController: %v", err) } else { logger.Printf("Exiting multiTenantController") return } // Retry after 1sec time.Sleep(time.Second) } }() for { if multiTenantController.IsStarted() { logger.Printf("MultiTenantController is started") break } logger.Printf("Waiting for multiTenantController to start...") time.Sleep(time.Millisecond * 500) } // TODO: do we need this to be running? logger.Printf("Starting SyncHostNCVersion") go func() { // Periodically poll vfp programmed NC version from NMAgent tickerChannel := time.Tick(time.Duration(cnsconfig.SyncHostNCVersionIntervalMs) * time.Millisecond) for { select { case <-tickerChannel: timedCtx, cancel := context.WithTimeout(ctx, time.Duration(cnsconfig.SyncHostNCVersionIntervalMs)*time.Millisecond) httpRestServiceImpl.SyncHostNCVersion(timedCtx, cnsconfig.ChannelMode) cancel() case <-ctx.Done(): return } } }() return nil } type nodeNetworkConfigGetter interface { Get(context.Context) (*v1alpha.NodeNetworkConfig, error) } type ipamStateReconciler interface { ReconcileIPAMStateForSwift(ncRequests []*cns.CreateNetworkContainerRequest, podInfoByIP map[string]cns.PodInfo, nnc *v1alpha.NodeNetworkConfig) cnstypes.ResponseCode } // TODO(rbtr) where should this live?? // reconcileInitialCNSState initializes cns by passing pods and a CreateNetworkContainerRequest func reconcileInitialCNSState(ctx context.Context, cli nodeNetworkConfigGetter, ipamReconciler ipamStateReconciler, podInfoByIPProvider cns.PodInfoByIPProvider) error { // Get nnc using direct client nnc, err := cli.Get(ctx) if err != nil { if crd.IsNotDefined(err) { return errors.Wrap(err, "failed to init CNS state: NNC CRD is not defined") } if apierrors.IsNotFound(err) { return errors.Wrap(err, "failed to init CNS state: NNC not found") } return errors.Wrap(err, "failed to init CNS state: failed to get NNC CRD") } logger.Printf("Retrieved NNC: %+v", nnc) if !nnc.DeletionTimestamp.IsZero() { return errors.New("failed to init CNS state: NNC is being deleted") } // If there are no NCs, we can't initialize our state and we should fail out. if len(nnc.Status.NetworkContainers) == 0 { return errors.New("failed to init CNS state: no NCs found in NNC CRD") } // Get previous PodInfo state from podInfoByIPProvider podInfoByIP, err := podInfoByIPProvider.PodInfoByIP() if err != nil { return errors.Wrap(err, "provider failed to provide PodInfoByIP") } ncReqs := make([]*cns.CreateNetworkContainerRequest, len(nnc.Status.NetworkContainers)) // For each NC, we need to create a CreateNetworkContainerRequest and use it to rebuild our state. for i := range nnc.Status.NetworkContainers { var ( ncRequest *cns.CreateNetworkContainerRequest err error ) switch nnc.Status.NetworkContainers[i].AssignmentMode { //nolint:exhaustive // skipping dynamic case case v1alpha.Static: ncRequest, err = nncctrl.CreateNCRequestFromStaticNC(nnc.Status.NetworkContainers[i]) default: // For backward compatibility, default will be treated as Dynamic too. ncRequest, err = nncctrl.CreateNCRequestFromDynamicNC(nnc.Status.NetworkContainers[i]) } if err != nil { return errors.Wrapf(err, "failed to convert NNC status to network container request, "+ "assignmentMode: %s", nnc.Status.NetworkContainers[i].AssignmentMode) } ncReqs[i] = ncRequest } // Call cnsclient init cns passing those two things. if err := restserver.ResponseCodeToError(ipamReconciler.ReconcileIPAMStateForSwift(ncReqs, podInfoByIP, nnc)); err != nil { return errors.Wrap(err, "failed to reconcile CNS IPAM state") } return nil } // InitializeCRDState builds and starts the CRD controllers. func InitializeCRDState(ctx context.Context, httpRestService cns.HTTPService, cnsconfig *configuration.CNSConfig) error { // convert interface type to implementation type httpRestServiceImplementation, ok := httpRestService.(*restserver.HTTPRestService) if !ok { logger.Errorf("[Azure CNS] Failed to convert interface httpRestService to implementation: %v", httpRestService) return fmt.Errorf("[Azure CNS] Failed to convert interface httpRestService to implementation: %v", httpRestService) } // Set orchestrator type orchestrator := cns.SetOrchestratorTypeRequest{ OrchestratorType: cns.KubernetesCRD, } httpRestServiceImplementation.SetNodeOrchestrator(&orchestrator) // build default clientset. kubeConfig, err := ctrl.GetConfig() if err != nil { logger.Errorf("[Azure CNS] Failed to get kubeconfig for request controller: %v", err) return errors.Wrap(err, "failed to get kubeconfig") } kubeConfig.UserAgent = fmt.Sprintf("azure-cns-%s", version) clientset, err := kubernetes.NewForConfig(kubeConfig) if err != nil { return errors.Wrap(err, "failed to build clientset") } // get nodename for scoping kube requests to node. nodeName, err := configuration.NodeName() if err != nil { return errors.Wrap(err, "failed to get NodeName") } node, err := clientset.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{}) if err != nil { return errors.Wrapf(err, "failed to get node %s", nodeName) } // check the Node labels for Swift V2 if _, ok := node.Labels[configuration.LabelNodeSwiftV2]; ok { cnsconfig.EnableSwiftV2 = true cnsconfig.WatchPods = true if nodeInfoErr := createOrUpdateNodeInfoCRD(ctx, kubeConfig, node); nodeInfoErr != nil { return errors.Wrap(nodeInfoErr, "error creating or updating nodeinfo crd") } } // perform state migration from CNI in case CNS is set to manage the endpoint state and has emty state if cnsconfig.EnableStateMigration && !httpRestServiceImplementation.EndpointStateStore.Exists() { if err = PopulateCNSEndpointState(httpRestServiceImplementation.EndpointStateStore); err != nil { return errors.Wrap(err, "failed to create CNS EndpointState From CNI") } // endpoint state needs tobe loaded in memory so the subsequent Delete calls remove the state and release the IPs. if err = httpRestServiceImplementation.EndpointStateStore.Read(restserver.EndpointStoreKey, &httpRestServiceImplementation.EndpointState); err != nil { return errors.Wrap(err, "failed to restore endpoint state") } } podInfoByIPProvider, err := getPodInfoByIPProvider(ctx, cnsconfig, httpRestServiceImplementation, clientset, nodeName) if err != nil { return errors.Wrap(err, "failed to initialize ip state") } // create scoped kube clients. directcli, err := client.New(kubeConfig, client.Options{Scheme: nodenetworkconfig.Scheme}) if err != nil { return errors.Wrap(err, "failed to create ctrl client") } directnnccli := nodenetworkconfig.NewClient(directcli) if err != nil { return errors.Wrap(err, "failed to create NNC client") } // TODO(rbtr): nodename and namespace should be in the cns config directscopedcli := nncctrl.NewScopedClient(directnnccli, types.NamespacedName{Namespace: "kube-system", Name: nodeName}) logger.Printf("Reconciling initial CNS state") // apiserver nnc might not be registered or api server might be down and crashloop backof puts us outside of 5-10 minutes we have for // aks addons to come up so retry a bit more aggresively here. // will retry 10 times maxing out at a minute taking about 8 minutes before it gives up. attempt := 0 err = retry.Do(func() error { attempt++ logger.Printf("reconciling initial CNS state attempt: %d", attempt) err = reconcileInitialCNSState(ctx, directscopedcli, httpRestServiceImplementation, podInfoByIPProvider) if err != nil { logger.Errorf("failed to reconcile initial CNS state, attempt: %d err: %v", attempt, err) } return errors.Wrap(err, "failed to initialize CNS state") }, retry.Context(ctx), retry.Delay(initCNSInitalDelay), retry.MaxDelay(time.Minute)) if err != nil { return err } logger.Printf("reconciled initial CNS state after %d attempts", attempt) scheme := kuberuntime.NewScheme() if err := corev1.AddToScheme(scheme); err != nil { //nolint:govet // intentional shadow return errors.Wrap(err, "failed to add corev1 to scheme") } if err = v1alpha.AddToScheme(scheme); err != nil { return errors.Wrap(err, "failed to add nodenetworkconfig/v1alpha to scheme") } if err = cssv1alpha1.AddToScheme(scheme); err != nil { return errors.Wrap(err, "failed to add clustersubnetstate/v1alpha1 to scheme") } if err = mtv1alpha1.AddToScheme(scheme); err != nil { return errors.Wrap(err, "failed to add multitenantpodnetworkconfig/v1alpha1 to scheme") } // Set Selector options on the Manager cache which are used // to perform *server-side* filtering of the cached objects. This is very important // for high node/pod count clusters, as it keeps us from watching objects at the // whole cluster scope when we are only interested in the Node's scope. cacheOpts := cache.Options{ Scheme: scheme, ByObject: map[client.Object]cache.ByObject{ &v1alpha.NodeNetworkConfig{}: { Namespaces: map[string]cache.Config{ "kube-system": {FieldSelector: fields.SelectorFromSet(fields.Set{"metadata.name": nodeName})}, }, }, }, } if cnsconfig.WatchPods { cacheOpts.ByObject[&corev1.Pod{}] = cache.ByObject{ Field: fields.SelectorFromSet(fields.Set{"spec.nodeName": nodeName}), } } if cnsconfig.EnableSubnetScarcity { cacheOpts.ByObject[&cssv1alpha1.ClusterSubnetState{}] = cache.ByObject{ Namespaces: map[string]cache.Config{ "kube-system": {}, }, } } managerOpts := ctrlmgr.Options{ Scheme: scheme, Metrics: ctrlmetrics.Options{BindAddress: "0"}, Cache: cacheOpts, Logger: ctrlzap.New(), } manager, err := ctrl.NewManager(kubeConfig, managerOpts) if err != nil { return errors.Wrap(err, "failed to create manager") } // this cachedscopedclient is built using the Manager's cached client, which is // NOT SAFE TO USE UNTIL THE MANAGER IS STARTED! // This is okay because it is only used to build the IPAMPoolMonitor, which does not // attempt to use the client until it has received a NodeNetworkConfig to update, and // that can only happen once the Manager has started and the NodeNetworkConfig // reconciler has pushed the Monitor a NodeNetworkConfig. cachedscopedcli := nncctrl.NewScopedClient(nodenetworkconfig.NewClient(manager.GetClient()), types.NamespacedName{Namespace: "kube-system", Name: nodeName}) // Build the IPAM Pool monitor var poolMonitor cns.IPAMPoolMonitor cssCh := make(chan cssv1alpha1.ClusterSubnetState) ipDemandCh := make(chan int) if cnsconfig.EnableIPAMv2 { cssSrc := func(context.Context) ([]cssv1alpha1.ClusterSubnetState, error) { return nil, nil } if cnsconfig.EnableSubnetScarcity { cssSrc = clustersubnetstate.NewClient(manager.GetClient()).List } nncCh := make(chan v1alpha.NodeNetworkConfig) pmv2 := ipampoolv2.NewMonitor(z, httpRestServiceImplementation, cachedscopedcli, ipDemandCh, nncCh, cssCh) obs := metrics.NewLegacyMetricsObserver(httpRestService.GetPodIPConfigState, cachedscopedcli.Get, cssSrc) pmv2.WithLegacyMetricsObserver(obs) poolMonitor = pmv2.AsV1(nncCh) } else { poolOpts := ipampool.Options{ RefreshDelay: poolIPAMRefreshRateInMilliseconds * time.Millisecond, } poolMonitor = ipampool.NewMonitor(httpRestServiceImplementation, cachedscopedcli, cssCh, &poolOpts) } // Start building the NNC Reconciler // get CNS Node IP to compare NC Node IP with this Node IP to ensure NCs were created for this node nodeIP := configuration.NodeIP() nncReconciler := nncctrl.NewReconciler(httpRestServiceImplementation, poolMonitor, nodeIP) // pass Node to the Reconciler for Controller xref // IPAMv1 - reconcile only status changes (where generation doesn't change). // IPAMv2 - reconcile all updates. filterGenerationChange := !cnsconfig.EnableIPAMv2 if err := nncReconciler.SetupWithManager(manager, node, filterGenerationChange); err != nil { //nolint:govet // intentional shadow return errors.Wrapf(err, "failed to setup nnc reconciler with manager") } if cnsconfig.EnableSubnetScarcity { // ClusterSubnetState reconciler cssReconciler := cssctrl.New(cssCh) if err := cssReconciler.SetupWithManager(manager); err != nil { return errors.Wrapf(err, "failed to setup css reconciler with manager") } } // TODO: add pod listeners based on Swift V1 vs MT/V2 configuration if cnsconfig.WatchPods { pw := podctrl.New(z) if cnsconfig.EnableIPAMv2 { hostNetworkListOpt := &client.ListOptions{FieldSelector: fields.SelectorFromSet(fields.Set{"spec.hostNetwork": "false"})} // filter only podsubnet pods // don't relist pods more than every 500ms limit := rate.NewLimiter(rate.Every(500*time.Millisecond), 1) //nolint:gomnd // clearly 500ms pw.With(pw.NewNotifierFunc(hostNetworkListOpt, limit, ipampoolv2.PodIPDemandListener(ipDemandCh))) } if err := pw.SetupWithManager(ctx, manager); err != nil { return errors.Wrapf(err, "failed to setup pod watcher with manager") } } if cnsconfig.EnableSwiftV2 { if err := mtpncctrl.SetupWithManager(manager); err != nil { return errors.Wrapf(err, "failed to setup mtpnc reconciler with manager") } // if SWIFT v2 is enabled on CNS, attach multitenant middleware to rest service // switch here for AKS(K8s) swiftv2 middleware to process IP configs requests swiftV2Middleware := &middlewares.K8sSWIFTv2Middleware{Cli: manager.GetClient()} httpRestService.AttachIPConfigsHandlerMiddleware(swiftV2Middleware) } // start the pool Monitor before the Reconciler, since it needs to be ready to receive an // NodeNetworkConfig update by the time the Reconciler tries to send it. go func() { logger.Printf("Starting IPAM Pool Monitor") if e := poolMonitor.Start(ctx); e != nil { logger.Errorf("[Azure CNS] Failed to start pool monitor with err: %v", e) } }() logger.Printf("initialized and started IPAM pool monitor") // Start the Manager which starts the reconcile loop. // The Reconciler will send an initial NodeNetworkConfig update to the PoolMonitor, starting the // Monitor's internal loop. go func() { logger.Printf("Starting controller-manager.") for { if err := manager.Start(ctx); err != nil { logger.Errorf("Failed to start controller-manager: %v", err) // retry to start the request controller // inc the managerStartFailures metric for failure tracking managerStartFailures.Inc() } else { logger.Printf("Stopped controller-manager.") return } time.Sleep(time.Second) // TODO(rbtr): make this exponential backoff } }() logger.Printf("Initialized controller-manager.") for { logger.Printf("Waiting for NodeNetworkConfig reconciler to start.") // wait for the Reconciler to run once on a NNC that was made for this Node. // the nncReadyCtx has a timeout of 15 minutes, after which we will consider // this false and the NNC Reconciler stuck/failed, log and retry. nncReadyCtx, cancel := context.WithTimeout(ctx, 15*time.Minute) // nolint // it will time out and not leak if started, err := nncReconciler.Started(nncReadyCtx); !started { logger.Errorf("NNC reconciler has not started, does the NNC exist? err: %v", err) nncReconcilerStartFailures.Inc() continue } logger.Printf("NodeNetworkConfig reconciler has started.") cancel() break } go func() { logger.Printf("Starting SyncHostNCVersion loop.") // Periodically poll vfp programmed NC version from NMAgent tickerChannel := time.Tick(time.Duration(cnsconfig.SyncHostNCVersionIntervalMs) * time.Millisecond) for { select { case <-tickerChannel: timedCtx, cancel := context.WithTimeout(ctx, time.Duration(cnsconfig.SyncHostNCVersionIntervalMs)*time.Millisecond) httpRestServiceImplementation.SyncHostNCVersion(timedCtx, cnsconfig.ChannelMode) cancel() case <-ctx.Done(): logger.Printf("Stopping SyncHostNCVersion loop.") return } } }() logger.Printf("Initialized SyncHostNCVersion loop.") return nil } // getPodInfoByIPProvider returns a PodInfoByIPProvider that reads endpoint state from the configured source func getPodInfoByIPProvider( ctx context.Context, cnsconfig *configuration.CNSConfig, httpRestServiceImplementation *restserver.HTTPRestService, clientset *kubernetes.Clientset, nodeName string, ) (podInfoByIPProvider cns.PodInfoByIPProvider, err error) { switch { case cnsconfig.ManageEndpointState: logger.Printf("Initializing from self managed endpoint store") podInfoByIPProvider, err = cnireconciler.NewCNSPodInfoProvider(httpRestServiceImplementation.EndpointStateStore) // get reference to endpoint state store from rest server if err != nil { if errors.Is(err, store.ErrKeyNotFound) { logger.Printf("[Azure CNS] No endpoint state found, skipping initializing CNS state") } else { return podInfoByIPProvider, errors.Wrap(err, "failed to create CNS PodInfoProvider") } } case cnsconfig.InitializeFromCNI: logger.Printf("Initializing from CNI") podInfoByIPProvider, err = cnireconciler.NewCNIPodInfoProvider() if err != nil { return podInfoByIPProvider, errors.Wrap(err, "failed to create CNI PodInfoProvider") } default: logger.Printf("Initializing from Kubernetes") podInfoByIPProvider = cns.PodInfoByIPProviderFunc(func() (map[string]cns.PodInfo, error) { pods, err := clientset.CoreV1().Pods("").List(ctx, metav1.ListOptions{ //nolint:govet // ignore err shadow FieldSelector: "spec.nodeName=" + nodeName, }) if err != nil { return nil, errors.Wrap(err, "failed to list Pods for PodInfoProvider") } podInfo, err := cns.KubePodsToPodInfoByIP(pods.Items) if err != nil { return nil, errors.Wrap(err, "failed to convert Pods to PodInfoByIP") } return podInfo, nil }) } return podInfoByIPProvider, nil } // createOrUpdateNodeInfoCRD polls imds to learn the VM Unique ID and then creates or updates the NodeInfo CRD // with that vm unique ID func createOrUpdateNodeInfoCRD(ctx context.Context, restConfig *rest.Config, node *corev1.Node) error { imdsCli := imds.NewClient() vmUniqueID, err := imdsCli.GetVMUniqueID(ctx) if err != nil { return errors.Wrap(err, "error getting vm unique ID from imds") } directcli, err := client.New(restConfig, client.Options{Scheme: multitenancy.Scheme}) if err != nil { return errors.Wrap(err, "failed to create ctrl client") } nodeInfoCli := multitenancy.NodeInfoClient{ Cli: directcli, } nodeInfo := &mtv1alpha1.NodeInfo{ ObjectMeta: metav1.ObjectMeta{ Name: node.Name, }, Spec: mtv1alpha1.NodeInfoSpec{ VMUniqueID: vmUniqueID, }, } if err := controllerutil.SetOwnerReference(node, nodeInfo, multitenancy.Scheme); err != nil { return errors.Wrap(err, "failed to set nodeinfo owner reference to node") } if err := nodeInfoCli.CreateOrUpdate(ctx, nodeInfo, "azure-cns"); err != nil { return errors.Wrap(err, "error ensuring nodeinfo CRD exists and is up-to-date") } return nil } // PopulateCNSEndpointState initilizes CNS Endpoint State by Migrating the CNI state. func PopulateCNSEndpointState(endpointStateStore store.KeyValueStore) error { logger.Printf("State Migration is enabled") endpointState, err := cnireconciler.MigrateCNISate() if err != nil { return errors.Wrap(err, "failed to create CNS Endpoint state from CNI") } err = endpointStateStore.Write(restserver.EndpointStoreKey, endpointState) if err != nil { return fmt.Errorf("failed to write endpoint state to store: %w", err) } return nil }