func InitializeCRDState()

in cns/service/main.go [1375:1662]


func InitializeCRDState(ctx context.Context, httpRestService cns.HTTPService, cnsconfig *configuration.CNSConfig) error {
	// convert interface type to implementation type
	httpRestServiceImplementation, ok := httpRestService.(*restserver.HTTPRestService)
	if !ok {
		logger.Errorf("[Azure CNS] Failed to convert interface httpRestService to implementation: %v", httpRestService)
		return fmt.Errorf("[Azure CNS] Failed to convert interface httpRestService to implementation: %v",
			httpRestService)
	}

	// Set orchestrator type
	orchestrator := cns.SetOrchestratorTypeRequest{
		OrchestratorType: cns.KubernetesCRD,
	}
	httpRestServiceImplementation.SetNodeOrchestrator(&orchestrator)

	// build default clientset.
	kubeConfig, err := ctrl.GetConfig()
	if err != nil {
		logger.Errorf("[Azure CNS] Failed to get kubeconfig for request controller: %v", err)
		return errors.Wrap(err, "failed to get kubeconfig")
	}
	kubeConfig.UserAgent = fmt.Sprintf("azure-cns-%s", version)

	clientset, err := kubernetes.NewForConfig(kubeConfig)
	if err != nil {
		return errors.Wrap(err, "failed to build clientset")
	}

	// get nodename for scoping kube requests to node.
	nodeName, err := configuration.NodeName()
	if err != nil {
		return errors.Wrap(err, "failed to get NodeName")
	}

	node, err := clientset.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
	if err != nil {
		return errors.Wrapf(err, "failed to get node %s", nodeName)
	}

	// check the Node labels for Swift V2
	if _, ok := node.Labels[configuration.LabelNodeSwiftV2]; ok {
		cnsconfig.EnableSwiftV2 = true
		cnsconfig.WatchPods = true
		if nodeInfoErr := createOrUpdateNodeInfoCRD(ctx, kubeConfig, node); nodeInfoErr != nil {
			return errors.Wrap(nodeInfoErr, "error creating or updating nodeinfo crd")
		}
	}

	// perform state migration from CNI in case CNS is set to manage the endpoint state and has emty state
	if cnsconfig.EnableStateMigration && !httpRestServiceImplementation.EndpointStateStore.Exists() {
		if err = PopulateCNSEndpointState(httpRestServiceImplementation.EndpointStateStore); err != nil {
			return errors.Wrap(err, "failed to create CNS EndpointState From CNI")
		}
		// endpoint state needs tobe loaded in memory so the subsequent Delete calls remove the state and release the IPs.
		if err = httpRestServiceImplementation.EndpointStateStore.Read(restserver.EndpointStoreKey, &httpRestServiceImplementation.EndpointState); err != nil {
			return errors.Wrap(err, "failed to restore endpoint state")
		}
	}

	podInfoByIPProvider, err := getPodInfoByIPProvider(ctx, cnsconfig, httpRestServiceImplementation, clientset, nodeName)
	if err != nil {
		return errors.Wrap(err, "failed to initialize ip state")
	}

	// create scoped kube clients.
	directcli, err := client.New(kubeConfig, client.Options{Scheme: nodenetworkconfig.Scheme})
	if err != nil {
		return errors.Wrap(err, "failed to create ctrl client")
	}
	directnnccli := nodenetworkconfig.NewClient(directcli)
	if err != nil {
		return errors.Wrap(err, "failed to create NNC client")
	}
	// TODO(rbtr): nodename and namespace should be in the cns config
	directscopedcli := nncctrl.NewScopedClient(directnnccli, types.NamespacedName{Namespace: "kube-system", Name: nodeName})

	logger.Printf("Reconciling initial CNS state")
	// apiserver nnc might not be registered or api server might be down and crashloop backof puts us outside of 5-10 minutes we have for
	// aks addons to come up so retry a bit more aggresively here.
	// will retry 10 times maxing out at a minute taking about 8 minutes before it gives up.
	attempt := 0
	err = retry.Do(func() error {
		attempt++
		logger.Printf("reconciling initial CNS state attempt: %d", attempt)
		err = reconcileInitialCNSState(ctx, directscopedcli, httpRestServiceImplementation, podInfoByIPProvider)
		if err != nil {
			logger.Errorf("failed to reconcile initial CNS state, attempt: %d err: %v", attempt, err)
		}
		return errors.Wrap(err, "failed to initialize CNS state")
	}, retry.Context(ctx), retry.Delay(initCNSInitalDelay), retry.MaxDelay(time.Minute))
	if err != nil {
		return err
	}
	logger.Printf("reconciled initial CNS state after %d attempts", attempt)

	scheme := kuberuntime.NewScheme()
	if err := corev1.AddToScheme(scheme); err != nil { //nolint:govet // intentional shadow
		return errors.Wrap(err, "failed to add corev1 to scheme")
	}
	if err = v1alpha.AddToScheme(scheme); err != nil {
		return errors.Wrap(err, "failed to add nodenetworkconfig/v1alpha to scheme")
	}
	if err = cssv1alpha1.AddToScheme(scheme); err != nil {
		return errors.Wrap(err, "failed to add clustersubnetstate/v1alpha1 to scheme")
	}
	if err = mtv1alpha1.AddToScheme(scheme); err != nil {
		return errors.Wrap(err, "failed to add multitenantpodnetworkconfig/v1alpha1 to scheme")
	}

	// Set Selector options on the Manager cache which are used
	// to perform *server-side* filtering of the cached objects. This is very important
	// for high node/pod count clusters, as it keeps us from watching objects at the
	// whole cluster scope when we are only interested in the Node's scope.
	cacheOpts := cache.Options{
		Scheme: scheme,
		ByObject: map[client.Object]cache.ByObject{
			&v1alpha.NodeNetworkConfig{}: {
				Namespaces: map[string]cache.Config{
					"kube-system": {FieldSelector: fields.SelectorFromSet(fields.Set{"metadata.name": nodeName})},
				},
			},
		},
	}

	if cnsconfig.WatchPods {
		cacheOpts.ByObject[&corev1.Pod{}] = cache.ByObject{
			Field: fields.SelectorFromSet(fields.Set{"spec.nodeName": nodeName}),
		}
	}

	if cnsconfig.EnableSubnetScarcity {
		cacheOpts.ByObject[&cssv1alpha1.ClusterSubnetState{}] = cache.ByObject{
			Namespaces: map[string]cache.Config{
				"kube-system": {},
			},
		}
	}

	managerOpts := ctrlmgr.Options{
		Scheme:  scheme,
		Metrics: ctrlmetrics.Options{BindAddress: "0"},
		Cache:   cacheOpts,
		Logger:  ctrlzap.New(),
	}

	manager, err := ctrl.NewManager(kubeConfig, managerOpts)
	if err != nil {
		return errors.Wrap(err, "failed to create manager")
	}

	// this cachedscopedclient is built using the Manager's cached client, which is
	// NOT SAFE TO USE UNTIL THE MANAGER IS STARTED!
	// This is okay because it is only used to build the IPAMPoolMonitor, which does not
	// attempt to use the client until it has received a NodeNetworkConfig to update, and
	// that can only happen once the Manager has started and the NodeNetworkConfig
	// reconciler has pushed the Monitor a NodeNetworkConfig.
	cachedscopedcli := nncctrl.NewScopedClient(nodenetworkconfig.NewClient(manager.GetClient()), types.NamespacedName{Namespace: "kube-system", Name: nodeName})

	// Build the IPAM Pool monitor
	var poolMonitor cns.IPAMPoolMonitor
	cssCh := make(chan cssv1alpha1.ClusterSubnetState)
	ipDemandCh := make(chan int)
	if cnsconfig.EnableIPAMv2 {
		cssSrc := func(context.Context) ([]cssv1alpha1.ClusterSubnetState, error) { return nil, nil }
		if cnsconfig.EnableSubnetScarcity {
			cssSrc = clustersubnetstate.NewClient(manager.GetClient()).List
		}
		nncCh := make(chan v1alpha.NodeNetworkConfig)
		pmv2 := ipampoolv2.NewMonitor(z, httpRestServiceImplementation, cachedscopedcli, ipDemandCh, nncCh, cssCh)
		obs := metrics.NewLegacyMetricsObserver(httpRestService.GetPodIPConfigState, cachedscopedcli.Get, cssSrc)
		pmv2.WithLegacyMetricsObserver(obs)
		poolMonitor = pmv2.AsV1(nncCh)
	} else {
		poolOpts := ipampool.Options{
			RefreshDelay: poolIPAMRefreshRateInMilliseconds * time.Millisecond,
		}
		poolMonitor = ipampool.NewMonitor(httpRestServiceImplementation, cachedscopedcli, cssCh, &poolOpts)
	}

	// Start building the NNC Reconciler

	// get CNS Node IP to compare NC Node IP with this Node IP to ensure NCs were created for this node
	nodeIP := configuration.NodeIP()
	nncReconciler := nncctrl.NewReconciler(httpRestServiceImplementation, poolMonitor, nodeIP)
	// pass Node to the Reconciler for Controller xref
	// IPAMv1 - reconcile only status changes (where generation doesn't change).
	// IPAMv2 - reconcile all updates.
	filterGenerationChange := !cnsconfig.EnableIPAMv2
	if err := nncReconciler.SetupWithManager(manager, node, filterGenerationChange); err != nil { //nolint:govet // intentional shadow
		return errors.Wrapf(err, "failed to setup nnc reconciler with manager")
	}

	if cnsconfig.EnableSubnetScarcity {
		// ClusterSubnetState reconciler
		cssReconciler := cssctrl.New(cssCh)
		if err := cssReconciler.SetupWithManager(manager); err != nil {
			return errors.Wrapf(err, "failed to setup css reconciler with manager")
		}
	}

	// TODO: add pod listeners based on Swift V1 vs MT/V2 configuration
	if cnsconfig.WatchPods {
		pw := podctrl.New(z)
		if cnsconfig.EnableIPAMv2 {
			hostNetworkListOpt := &client.ListOptions{FieldSelector: fields.SelectorFromSet(fields.Set{"spec.hostNetwork": "false"})} // filter only podsubnet pods
			// don't relist pods more than every 500ms
			limit := rate.NewLimiter(rate.Every(500*time.Millisecond), 1) //nolint:gomnd // clearly 500ms
			pw.With(pw.NewNotifierFunc(hostNetworkListOpt, limit, ipampoolv2.PodIPDemandListener(ipDemandCh)))
		}
		if err := pw.SetupWithManager(ctx, manager); err != nil {
			return errors.Wrapf(err, "failed to setup pod watcher with manager")
		}
	}

	if cnsconfig.EnableSwiftV2 {
		if err := mtpncctrl.SetupWithManager(manager); err != nil {
			return errors.Wrapf(err, "failed to setup mtpnc reconciler with manager")
		}
		// if SWIFT v2 is enabled on CNS, attach multitenant middleware to rest service
		// switch here for AKS(K8s) swiftv2 middleware to process IP configs requests
		swiftV2Middleware := &middlewares.K8sSWIFTv2Middleware{Cli: manager.GetClient()}
		httpRestService.AttachIPConfigsHandlerMiddleware(swiftV2Middleware)
	}

	// start the pool Monitor before the Reconciler, since it needs to be ready to receive an
	// NodeNetworkConfig update by the time the Reconciler tries to send it.
	go func() {
		logger.Printf("Starting IPAM Pool Monitor")
		if e := poolMonitor.Start(ctx); e != nil {
			logger.Errorf("[Azure CNS] Failed to start pool monitor with err: %v", e)
		}
	}()
	logger.Printf("initialized and started IPAM pool monitor")

	// Start the Manager which starts the reconcile loop.
	// The Reconciler will send an initial NodeNetworkConfig update to the PoolMonitor, starting the
	// Monitor's internal loop.
	go func() {
		logger.Printf("Starting controller-manager.")
		for {
			if err := manager.Start(ctx); err != nil {
				logger.Errorf("Failed to start controller-manager: %v", err)
				// retry to start the request controller
				// inc the managerStartFailures metric for failure tracking
				managerStartFailures.Inc()
			} else {
				logger.Printf("Stopped controller-manager.")
				return
			}
			time.Sleep(time.Second) // TODO(rbtr): make this exponential backoff
		}
	}()
	logger.Printf("Initialized controller-manager.")
	for {
		logger.Printf("Waiting for NodeNetworkConfig reconciler to start.")
		// wait for the Reconciler to run once on a NNC that was made for this Node.
		// the nncReadyCtx has a timeout of 15 minutes, after which we will consider
		// this false and the NNC Reconciler stuck/failed, log and retry.
		nncReadyCtx, cancel := context.WithTimeout(ctx, 15*time.Minute) // nolint // it will time out and not leak
		if started, err := nncReconciler.Started(nncReadyCtx); !started {
			logger.Errorf("NNC reconciler has not started, does the NNC exist? err: %v", err)
			nncReconcilerStartFailures.Inc()
			continue
		}
		logger.Printf("NodeNetworkConfig reconciler has started.")
		cancel()
		break
	}

	go func() {
		logger.Printf("Starting SyncHostNCVersion loop.")
		// Periodically poll vfp programmed NC version from NMAgent
		tickerChannel := time.Tick(time.Duration(cnsconfig.SyncHostNCVersionIntervalMs) * time.Millisecond)
		for {
			select {
			case <-tickerChannel:
				timedCtx, cancel := context.WithTimeout(ctx, time.Duration(cnsconfig.SyncHostNCVersionIntervalMs)*time.Millisecond)
				httpRestServiceImplementation.SyncHostNCVersion(timedCtx, cnsconfig.ChannelMode)
				cancel()
			case <-ctx.Done():
				logger.Printf("Stopping SyncHostNCVersion loop.")
				return
			}
		}
	}()
	logger.Printf("Initialized SyncHostNCVersion loop.")
	return nil
}