in cns/service/main.go [1375:1662]
func InitializeCRDState(ctx context.Context, httpRestService cns.HTTPService, cnsconfig *configuration.CNSConfig) error {
// convert interface type to implementation type
httpRestServiceImplementation, ok := httpRestService.(*restserver.HTTPRestService)
if !ok {
logger.Errorf("[Azure CNS] Failed to convert interface httpRestService to implementation: %v", httpRestService)
return fmt.Errorf("[Azure CNS] Failed to convert interface httpRestService to implementation: %v",
httpRestService)
}
// Set orchestrator type
orchestrator := cns.SetOrchestratorTypeRequest{
OrchestratorType: cns.KubernetesCRD,
}
httpRestServiceImplementation.SetNodeOrchestrator(&orchestrator)
// build default clientset.
kubeConfig, err := ctrl.GetConfig()
if err != nil {
logger.Errorf("[Azure CNS] Failed to get kubeconfig for request controller: %v", err)
return errors.Wrap(err, "failed to get kubeconfig")
}
kubeConfig.UserAgent = fmt.Sprintf("azure-cns-%s", version)
clientset, err := kubernetes.NewForConfig(kubeConfig)
if err != nil {
return errors.Wrap(err, "failed to build clientset")
}
// get nodename for scoping kube requests to node.
nodeName, err := configuration.NodeName()
if err != nil {
return errors.Wrap(err, "failed to get NodeName")
}
node, err := clientset.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
if err != nil {
return errors.Wrapf(err, "failed to get node %s", nodeName)
}
// check the Node labels for Swift V2
if _, ok := node.Labels[configuration.LabelNodeSwiftV2]; ok {
cnsconfig.EnableSwiftV2 = true
cnsconfig.WatchPods = true
if nodeInfoErr := createOrUpdateNodeInfoCRD(ctx, kubeConfig, node); nodeInfoErr != nil {
return errors.Wrap(nodeInfoErr, "error creating or updating nodeinfo crd")
}
}
// perform state migration from CNI in case CNS is set to manage the endpoint state and has emty state
if cnsconfig.EnableStateMigration && !httpRestServiceImplementation.EndpointStateStore.Exists() {
if err = PopulateCNSEndpointState(httpRestServiceImplementation.EndpointStateStore); err != nil {
return errors.Wrap(err, "failed to create CNS EndpointState From CNI")
}
// endpoint state needs tobe loaded in memory so the subsequent Delete calls remove the state and release the IPs.
if err = httpRestServiceImplementation.EndpointStateStore.Read(restserver.EndpointStoreKey, &httpRestServiceImplementation.EndpointState); err != nil {
return errors.Wrap(err, "failed to restore endpoint state")
}
}
podInfoByIPProvider, err := getPodInfoByIPProvider(ctx, cnsconfig, httpRestServiceImplementation, clientset, nodeName)
if err != nil {
return errors.Wrap(err, "failed to initialize ip state")
}
// create scoped kube clients.
directcli, err := client.New(kubeConfig, client.Options{Scheme: nodenetworkconfig.Scheme})
if err != nil {
return errors.Wrap(err, "failed to create ctrl client")
}
directnnccli := nodenetworkconfig.NewClient(directcli)
if err != nil {
return errors.Wrap(err, "failed to create NNC client")
}
// TODO(rbtr): nodename and namespace should be in the cns config
directscopedcli := nncctrl.NewScopedClient(directnnccli, types.NamespacedName{Namespace: "kube-system", Name: nodeName})
logger.Printf("Reconciling initial CNS state")
// apiserver nnc might not be registered or api server might be down and crashloop backof puts us outside of 5-10 minutes we have for
// aks addons to come up so retry a bit more aggresively here.
// will retry 10 times maxing out at a minute taking about 8 minutes before it gives up.
attempt := 0
err = retry.Do(func() error {
attempt++
logger.Printf("reconciling initial CNS state attempt: %d", attempt)
err = reconcileInitialCNSState(ctx, directscopedcli, httpRestServiceImplementation, podInfoByIPProvider)
if err != nil {
logger.Errorf("failed to reconcile initial CNS state, attempt: %d err: %v", attempt, err)
}
return errors.Wrap(err, "failed to initialize CNS state")
}, retry.Context(ctx), retry.Delay(initCNSInitalDelay), retry.MaxDelay(time.Minute))
if err != nil {
return err
}
logger.Printf("reconciled initial CNS state after %d attempts", attempt)
scheme := kuberuntime.NewScheme()
if err := corev1.AddToScheme(scheme); err != nil { //nolint:govet // intentional shadow
return errors.Wrap(err, "failed to add corev1 to scheme")
}
if err = v1alpha.AddToScheme(scheme); err != nil {
return errors.Wrap(err, "failed to add nodenetworkconfig/v1alpha to scheme")
}
if err = cssv1alpha1.AddToScheme(scheme); err != nil {
return errors.Wrap(err, "failed to add clustersubnetstate/v1alpha1 to scheme")
}
if err = mtv1alpha1.AddToScheme(scheme); err != nil {
return errors.Wrap(err, "failed to add multitenantpodnetworkconfig/v1alpha1 to scheme")
}
// Set Selector options on the Manager cache which are used
// to perform *server-side* filtering of the cached objects. This is very important
// for high node/pod count clusters, as it keeps us from watching objects at the
// whole cluster scope when we are only interested in the Node's scope.
cacheOpts := cache.Options{
Scheme: scheme,
ByObject: map[client.Object]cache.ByObject{
&v1alpha.NodeNetworkConfig{}: {
Namespaces: map[string]cache.Config{
"kube-system": {FieldSelector: fields.SelectorFromSet(fields.Set{"metadata.name": nodeName})},
},
},
},
}
if cnsconfig.WatchPods {
cacheOpts.ByObject[&corev1.Pod{}] = cache.ByObject{
Field: fields.SelectorFromSet(fields.Set{"spec.nodeName": nodeName}),
}
}
if cnsconfig.EnableSubnetScarcity {
cacheOpts.ByObject[&cssv1alpha1.ClusterSubnetState{}] = cache.ByObject{
Namespaces: map[string]cache.Config{
"kube-system": {},
},
}
}
managerOpts := ctrlmgr.Options{
Scheme: scheme,
Metrics: ctrlmetrics.Options{BindAddress: "0"},
Cache: cacheOpts,
Logger: ctrlzap.New(),
}
manager, err := ctrl.NewManager(kubeConfig, managerOpts)
if err != nil {
return errors.Wrap(err, "failed to create manager")
}
// this cachedscopedclient is built using the Manager's cached client, which is
// NOT SAFE TO USE UNTIL THE MANAGER IS STARTED!
// This is okay because it is only used to build the IPAMPoolMonitor, which does not
// attempt to use the client until it has received a NodeNetworkConfig to update, and
// that can only happen once the Manager has started and the NodeNetworkConfig
// reconciler has pushed the Monitor a NodeNetworkConfig.
cachedscopedcli := nncctrl.NewScopedClient(nodenetworkconfig.NewClient(manager.GetClient()), types.NamespacedName{Namespace: "kube-system", Name: nodeName})
// Build the IPAM Pool monitor
var poolMonitor cns.IPAMPoolMonitor
cssCh := make(chan cssv1alpha1.ClusterSubnetState)
ipDemandCh := make(chan int)
if cnsconfig.EnableIPAMv2 {
cssSrc := func(context.Context) ([]cssv1alpha1.ClusterSubnetState, error) { return nil, nil }
if cnsconfig.EnableSubnetScarcity {
cssSrc = clustersubnetstate.NewClient(manager.GetClient()).List
}
nncCh := make(chan v1alpha.NodeNetworkConfig)
pmv2 := ipampoolv2.NewMonitor(z, httpRestServiceImplementation, cachedscopedcli, ipDemandCh, nncCh, cssCh)
obs := metrics.NewLegacyMetricsObserver(httpRestService.GetPodIPConfigState, cachedscopedcli.Get, cssSrc)
pmv2.WithLegacyMetricsObserver(obs)
poolMonitor = pmv2.AsV1(nncCh)
} else {
poolOpts := ipampool.Options{
RefreshDelay: poolIPAMRefreshRateInMilliseconds * time.Millisecond,
}
poolMonitor = ipampool.NewMonitor(httpRestServiceImplementation, cachedscopedcli, cssCh, &poolOpts)
}
// Start building the NNC Reconciler
// get CNS Node IP to compare NC Node IP with this Node IP to ensure NCs were created for this node
nodeIP := configuration.NodeIP()
nncReconciler := nncctrl.NewReconciler(httpRestServiceImplementation, poolMonitor, nodeIP)
// pass Node to the Reconciler for Controller xref
// IPAMv1 - reconcile only status changes (where generation doesn't change).
// IPAMv2 - reconcile all updates.
filterGenerationChange := !cnsconfig.EnableIPAMv2
if err := nncReconciler.SetupWithManager(manager, node, filterGenerationChange); err != nil { //nolint:govet // intentional shadow
return errors.Wrapf(err, "failed to setup nnc reconciler with manager")
}
if cnsconfig.EnableSubnetScarcity {
// ClusterSubnetState reconciler
cssReconciler := cssctrl.New(cssCh)
if err := cssReconciler.SetupWithManager(manager); err != nil {
return errors.Wrapf(err, "failed to setup css reconciler with manager")
}
}
// TODO: add pod listeners based on Swift V1 vs MT/V2 configuration
if cnsconfig.WatchPods {
pw := podctrl.New(z)
if cnsconfig.EnableIPAMv2 {
hostNetworkListOpt := &client.ListOptions{FieldSelector: fields.SelectorFromSet(fields.Set{"spec.hostNetwork": "false"})} // filter only podsubnet pods
// don't relist pods more than every 500ms
limit := rate.NewLimiter(rate.Every(500*time.Millisecond), 1) //nolint:gomnd // clearly 500ms
pw.With(pw.NewNotifierFunc(hostNetworkListOpt, limit, ipampoolv2.PodIPDemandListener(ipDemandCh)))
}
if err := pw.SetupWithManager(ctx, manager); err != nil {
return errors.Wrapf(err, "failed to setup pod watcher with manager")
}
}
if cnsconfig.EnableSwiftV2 {
if err := mtpncctrl.SetupWithManager(manager); err != nil {
return errors.Wrapf(err, "failed to setup mtpnc reconciler with manager")
}
// if SWIFT v2 is enabled on CNS, attach multitenant middleware to rest service
// switch here for AKS(K8s) swiftv2 middleware to process IP configs requests
swiftV2Middleware := &middlewares.K8sSWIFTv2Middleware{Cli: manager.GetClient()}
httpRestService.AttachIPConfigsHandlerMiddleware(swiftV2Middleware)
}
// start the pool Monitor before the Reconciler, since it needs to be ready to receive an
// NodeNetworkConfig update by the time the Reconciler tries to send it.
go func() {
logger.Printf("Starting IPAM Pool Monitor")
if e := poolMonitor.Start(ctx); e != nil {
logger.Errorf("[Azure CNS] Failed to start pool monitor with err: %v", e)
}
}()
logger.Printf("initialized and started IPAM pool monitor")
// Start the Manager which starts the reconcile loop.
// The Reconciler will send an initial NodeNetworkConfig update to the PoolMonitor, starting the
// Monitor's internal loop.
go func() {
logger.Printf("Starting controller-manager.")
for {
if err := manager.Start(ctx); err != nil {
logger.Errorf("Failed to start controller-manager: %v", err)
// retry to start the request controller
// inc the managerStartFailures metric for failure tracking
managerStartFailures.Inc()
} else {
logger.Printf("Stopped controller-manager.")
return
}
time.Sleep(time.Second) // TODO(rbtr): make this exponential backoff
}
}()
logger.Printf("Initialized controller-manager.")
for {
logger.Printf("Waiting for NodeNetworkConfig reconciler to start.")
// wait for the Reconciler to run once on a NNC that was made for this Node.
// the nncReadyCtx has a timeout of 15 minutes, after which we will consider
// this false and the NNC Reconciler stuck/failed, log and retry.
nncReadyCtx, cancel := context.WithTimeout(ctx, 15*time.Minute) // nolint // it will time out and not leak
if started, err := nncReconciler.Started(nncReadyCtx); !started {
logger.Errorf("NNC reconciler has not started, does the NNC exist? err: %v", err)
nncReconcilerStartFailures.Inc()
continue
}
logger.Printf("NodeNetworkConfig reconciler has started.")
cancel()
break
}
go func() {
logger.Printf("Starting SyncHostNCVersion loop.")
// Periodically poll vfp programmed NC version from NMAgent
tickerChannel := time.Tick(time.Duration(cnsconfig.SyncHostNCVersionIntervalMs) * time.Millisecond)
for {
select {
case <-tickerChannel:
timedCtx, cancel := context.WithTimeout(ctx, time.Duration(cnsconfig.SyncHostNCVersionIntervalMs)*time.Millisecond)
httpRestServiceImplementation.SyncHostNCVersion(timedCtx, cnsconfig.ChannelMode)
cancel()
case <-ctx.Done():
logger.Printf("Stopping SyncHostNCVersion loop.")
return
}
}
}()
logger.Printf("Initialized SyncHostNCVersion loop.")
return nil
}