func NewController()

in pkg/neg/controller.go [111:324]


func NewController(
	kubeClient kubernetes.Interface,
	svcNegClient svcnegclient.Interface,
	destinationRuleClient dynamic.NamespaceableResourceInterface,
	kubeSystemUID types.UID,
	ingressInformer cache.SharedIndexInformer,
	serviceInformer cache.SharedIndexInformer,
	podInformer cache.SharedIndexInformer,
	nodeInformer cache.SharedIndexInformer,
	endpointInformer cache.SharedIndexInformer,
	endpointSliceInformer cache.SharedIndexInformer,
	destinationRuleInformer cache.SharedIndexInformer,
	svcNegInformer cache.SharedIndexInformer,
	hasSynced func() bool,
	controllerMetrics *usage.ControllerMetrics,
	l4Namer namer2.L4ResourcesNamer,
	defaultBackendService utils.ServicePort,
	cloud negtypes.NetworkEndpointGroupCloud,
	zoneGetter negtypes.ZoneGetter,
	namer negtypes.NetworkEndpointGroupNamer,
	resyncPeriod time.Duration,
	gcPeriod time.Duration,
	enableReadinessReflector bool,
	runIngress bool,
	runL4Controller bool,
	enableNonGcpMode bool,
	enableAsm bool,
	asmServiceNEGSkipNamespaces []string,
	enableEndpointSlices bool,
) *Controller {
	// init event recorder
	// TODO: move event recorder initializer to main. Reuse it among controllers.
	eventBroadcaster := record.NewBroadcaster()
	eventBroadcaster.StartLogging(klog.Infof)
	eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{
		Interface: kubeClient.CoreV1().Events(""),
	})
	negScheme := runtime.NewScheme()
	err := scheme.AddToScheme(negScheme)
	if err != nil {
		klog.Errorf("Errored adding default scheme to event recorder: %q", err)
	}
	err = svcnegv1beta1.AddToScheme(negScheme)
	if err != nil {
		klog.Errorf("Errored adding NEG CRD scheme to event recorder: %q", err)
	}
	recorder := eventBroadcaster.NewRecorder(negScheme,
		apiv1.EventSource{Component: "neg-controller"})

	var endpointIndexer, endpointSliceIndexer cache.Indexer
	if enableEndpointSlices {
		endpointSliceIndexer = endpointSliceInformer.GetIndexer()
	} else {
		endpointIndexer = endpointInformer.GetIndexer()
	}

	manager := newSyncerManager(
		namer,
		recorder,
		cloud,
		zoneGetter,
		svcNegClient,
		kubeSystemUID,
		podInformer.GetIndexer(),
		serviceInformer.GetIndexer(),
		endpointIndexer,
		endpointSliceIndexer,
		nodeInformer.GetIndexer(),
		svcNegInformer.GetIndexer(),
		enableNonGcpMode,
		enableEndpointSlices)

	var reflector readiness.Reflector
	if enableReadinessReflector {
		reflector = readiness.NewReadinessReflector(
			kubeClient,
			podInformer.GetIndexer(),
			cloud, manager)
	} else {
		reflector = &readiness.NoopReflector{}
	}
	manager.reflector = reflector

	negController := &Controller{
		client:                kubeClient,
		manager:               manager,
		resyncPeriod:          resyncPeriod,
		gcPeriod:              gcPeriod,
		recorder:              recorder,
		zoneGetter:            zoneGetter,
		namer:                 namer,
		l4Namer:               l4Namer,
		defaultBackendService: defaultBackendService,
		hasSynced:             hasSynced,
		ingressLister:         ingressInformer.GetIndexer(),
		serviceLister:         serviceInformer.GetIndexer(),
		serviceQueue:          workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
		endpointQueue:         workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
		nodeQueue:             workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
		syncTracker:           utils.NewTimeTracker(),
		reflector:             reflector,
		collector:             controllerMetrics,
		runL4:                 runL4Controller,
	}
	if runIngress {
		ingressInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
			AddFunc: func(obj interface{}) {
				addIng := obj.(*v1.Ingress)
				if !utils.IsGLBCIngress(addIng) {
					klog.V(4).Infof("Ignoring add for ingress %v based on annotation %v", common.NamespacedName(addIng), annotations.IngressClassKey)
					return
				}
				negController.enqueueIngressServices(addIng)
			},
			DeleteFunc: func(obj interface{}) {
				delIng := obj.(*v1.Ingress)
				if !utils.IsGLBCIngress(delIng) {
					klog.V(4).Infof("Ignoring delete for ingress %v based on annotation %v", common.NamespacedName(delIng), annotations.IngressClassKey)
					return
				}
				negController.enqueueIngressServices(delIng)
			},
			UpdateFunc: func(old, cur interface{}) {
				oldIng := old.(*v1.Ingress)
				curIng := cur.(*v1.Ingress)
				// Check if ingress class changed and previous class was a GCE ingress
				// Ingress class change may require cleanup so enqueue related services
				if !utils.IsGLBCIngress(curIng) && !utils.IsGLBCIngress(oldIng) {
					klog.V(4).Infof("Ignoring update for ingress %v based on annotation %v", common.NamespacedName(curIng), annotations.IngressClassKey)
					return
				}
				keys := gatherIngressServiceKeys(oldIng)
				keys = keys.Union(gatherIngressServiceKeys(curIng))
				for _, key := range keys.List() {
					negController.enqueueService(cache.ExplicitKey(key))
				}
			},
		})

		podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
			AddFunc: func(obj interface{}) {
				pod := obj.(*apiv1.Pod)
				negController.reflector.SyncPod(pod)
			},
			UpdateFunc: func(old, cur interface{}) {
				pod := cur.(*apiv1.Pod)
				negController.reflector.SyncPod(pod)
			},
		})
	}
	serviceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc:    negController.enqueueService,
		DeleteFunc: negController.enqueueService,
		UpdateFunc: func(old, cur interface{}) {
			negController.enqueueService(cur)
		},
	})
	if enableEndpointSlices {
		endpointSliceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
			AddFunc:    negController.enqueueEndpointSlice,
			DeleteFunc: negController.enqueueEndpointSlice,
			UpdateFunc: func(old, cur interface{}) {
				negController.enqueueEndpointSlice(cur)
			},
		})
	} else {
		endpointInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
			AddFunc:    negController.enqueueEndpoint,
			DeleteFunc: negController.enqueueEndpoint,
			UpdateFunc: func(old, cur interface{}) {
				negController.enqueueEndpoint(cur)
			},
		})
	}

	nodeEventHandler := cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			node := obj.(*apiv1.Node)
			negController.enqueueNode(node)
		},
		DeleteFunc: func(obj interface{}) {
			node := obj.(*apiv1.Node)
			negController.enqueueNode(node)
		},
	}

	if negController.runL4 {
		nodeEventHandler.UpdateFunc = func(old, cur interface{}) {
			oldNode := old.(*apiv1.Node)
			currentNode := cur.(*apiv1.Node)
			candidateNodeCheck := utils.CandidateNodesPredicateIncludeUnreadyExcludeUpgradingNodes
			if candidateNodeCheck(oldNode) != candidateNodeCheck(currentNode) {
				klog.Infof("Node %q has changed, enqueueing", currentNode.Name)
				negController.enqueueNode(currentNode)
			}
		}
	}
	nodeInformer.AddEventHandler(nodeEventHandler)

	if enableAsm {
		negController.enableASM = enableAsm
		negController.asmServiceNEGSkipNamespaces = asmServiceNEGSkipNamespaces
		negController.destinationRuleLister = destinationRuleInformer.GetIndexer()
		destinationRuleInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
			AddFunc:    negController.enqueueDestinationRule,
			DeleteFunc: negController.enqueueDestinationRule,
			UpdateFunc: func(old, cur interface{}) {
				negController.enqueueDestinationRule(cur)
			},
		})
		negController.destinationRuleClient = destinationRuleClient
	}
	return negController
}