func NewController()

in pilot/pkg/serviceregistry/kube/controller/controller.go [283:396]


func NewController(kubeClient kubelib.Client, options Options) *Controller {
	c := &Controller{
		opts:                       options,
		client:                     kubeClient,
		queue:                      queue.NewQueueWithID(1*time.Second, string(options.ClusterID)),
		servicesMap:                make(map[host.Name]*model.Service),
		nodeSelectorsForServices:   make(map[host.Name]labels.Instance),
		nodeInfoMap:                make(map[string]kubernetesNode),
		externalNameSvcInstanceMap: make(map[host.Name][]*model.ServiceInstance),
		workloadInstancesIndex:     workloadinstances.NewIndex(),
		informerInit:               atomic.NewBool(false),
		beginSync:                  atomic.NewBool(false),
		initialSync:                atomic.NewBool(false),

		multinetwork: initMultinetwork(),
	}

	if features.EnableMCSHost {
		c.hostNamesForNamespacedName = func(name types.NamespacedName) []host.Name {
			return []host.Name{
				kube.ServiceHostname(name.Name, name.Namespace, c.opts.DomainSuffix),
				serviceClusterSetLocalHostname(name),
			}
		}
		c.servicesForNamespacedName = func(name types.NamespacedName) []*model.Service {
			out := make([]*model.Service, 0, 2)

			c.RLock()
			if svc := c.servicesMap[kube.ServiceHostname(name.Name, name.Namespace, c.opts.DomainSuffix)]; svc != nil {
				out = append(out, svc)
			}

			if svc := c.servicesMap[serviceClusterSetLocalHostname(name)]; svc != nil {
				out = append(out, svc)
			}
			c.RUnlock()

			return out
		}
	} else {
		c.hostNamesForNamespacedName = func(name types.NamespacedName) []host.Name {
			return []host.Name{
				kube.ServiceHostname(name.Name, name.Namespace, c.opts.DomainSuffix),
			}
		}
		c.servicesForNamespacedName = func(name types.NamespacedName) []*model.Service {
			if svc := c.GetService(kube.ServiceHostname(name.Name, name.Namespace, c.opts.DomainSuffix)); svc != nil {
				return []*model.Service{svc}
			}
			return nil
		}
	}

	c.nsInformer = kubeClient.KubeInformer().Core().V1().Namespaces().Informer()
	c.nsLister = kubeClient.KubeInformer().Core().V1().Namespaces().Lister()
	if c.opts.SystemNamespace != "" {
		nsInformer := filter.NewFilteredSharedIndexInformer(func(obj interface{}) bool {
			ns, ok := obj.(*v1.Namespace)
			if !ok {
				log.Warnf("Namespace watch getting wrong type in event: %T", obj)
				return false
			}
			return ns.Name == c.opts.SystemNamespace
		}, c.nsInformer)
		c.registerHandlers(nsInformer, "Namespaces", c.onSystemNamespaceEvent, nil)
	}

	if c.opts.DiscoveryNamespacesFilter == nil {
		c.opts.DiscoveryNamespacesFilter = filter.NewDiscoveryNamespacesFilter(c.nsLister, options.MeshWatcher.Mesh().DiscoverySelectors)
	}

	c.initDiscoveryHandlers(kubeClient, options.EndpointMode, options.MeshWatcher, c.opts.DiscoveryNamespacesFilter)

	c.serviceInformer = filter.NewFilteredSharedIndexInformer(c.opts.DiscoveryNamespacesFilter.Filter, kubeClient.KubeInformer().Core().V1().Services().Informer())
	c.serviceLister = listerv1.NewServiceLister(c.serviceInformer.GetIndexer())

	c.registerHandlers(c.serviceInformer, "Services", c.onServiceEvent, nil)

	switch options.EndpointMode {
	case EndpointsOnly:
		c.endpoints = newEndpointsController(c)
	case EndpointSliceOnly:
		c.endpoints = newEndpointSliceController(c)
	}

	// This is for getting the node IPs of a selected set of nodes
	c.nodeInformer = kubeClient.KubeInformer().Core().V1().Nodes().Informer()
	c.nodeLister = kubeClient.KubeInformer().Core().V1().Nodes().Lister()
	c.registerHandlers(c.nodeInformer, "Nodes", c.onNodeEvent, nil)

	podInformer := filter.NewFilteredSharedIndexInformer(c.opts.DiscoveryNamespacesFilter.Filter, kubeClient.KubeInformer().Core().V1().Pods().Informer())
	c.pods = newPodCache(c, podInformer, func(key string) {
		item, exists, err := c.endpoints.getInformer().GetIndexer().GetByKey(key)
		if err != nil {
			log.Debugf("Endpoint %v lookup failed with error %v, skipping stale endpoint", key, err)
			return
		}
		if !exists {
			log.Debugf("Endpoint %v not found, skipping stale endpoint", key)
			return
		}
		if shouldEnqueue("Pods", c.beginSync) {
			c.queue.Push(func() error {
				return c.endpoints.onEvent(item, model.EventUpdate)
			})
		}
	})
	c.registerHandlers(c.pods.informer, "Pods", c.pods.onEvent, nil)

	c.exports = newServiceExportCache(c)
	c.imports = newServiceImportCache(c)

	return c
}