in pilot/pkg/serviceregistry/kube/controller/controller.go [283:396]
func NewController(kubeClient kubelib.Client, options Options) *Controller {
c := &Controller{
opts: options,
client: kubeClient,
queue: queue.NewQueueWithID(1*time.Second, string(options.ClusterID)),
servicesMap: make(map[host.Name]*model.Service),
nodeSelectorsForServices: make(map[host.Name]labels.Instance),
nodeInfoMap: make(map[string]kubernetesNode),
externalNameSvcInstanceMap: make(map[host.Name][]*model.ServiceInstance),
workloadInstancesIndex: workloadinstances.NewIndex(),
informerInit: atomic.NewBool(false),
beginSync: atomic.NewBool(false),
initialSync: atomic.NewBool(false),
multinetwork: initMultinetwork(),
}
if features.EnableMCSHost {
c.hostNamesForNamespacedName = func(name types.NamespacedName) []host.Name {
return []host.Name{
kube.ServiceHostname(name.Name, name.Namespace, c.opts.DomainSuffix),
serviceClusterSetLocalHostname(name),
}
}
c.servicesForNamespacedName = func(name types.NamespacedName) []*model.Service {
out := make([]*model.Service, 0, 2)
c.RLock()
if svc := c.servicesMap[kube.ServiceHostname(name.Name, name.Namespace, c.opts.DomainSuffix)]; svc != nil {
out = append(out, svc)
}
if svc := c.servicesMap[serviceClusterSetLocalHostname(name)]; svc != nil {
out = append(out, svc)
}
c.RUnlock()
return out
}
} else {
c.hostNamesForNamespacedName = func(name types.NamespacedName) []host.Name {
return []host.Name{
kube.ServiceHostname(name.Name, name.Namespace, c.opts.DomainSuffix),
}
}
c.servicesForNamespacedName = func(name types.NamespacedName) []*model.Service {
if svc := c.GetService(kube.ServiceHostname(name.Name, name.Namespace, c.opts.DomainSuffix)); svc != nil {
return []*model.Service{svc}
}
return nil
}
}
c.nsInformer = kubeClient.KubeInformer().Core().V1().Namespaces().Informer()
c.nsLister = kubeClient.KubeInformer().Core().V1().Namespaces().Lister()
if c.opts.SystemNamespace != "" {
nsInformer := filter.NewFilteredSharedIndexInformer(func(obj interface{}) bool {
ns, ok := obj.(*v1.Namespace)
if !ok {
log.Warnf("Namespace watch getting wrong type in event: %T", obj)
return false
}
return ns.Name == c.opts.SystemNamespace
}, c.nsInformer)
c.registerHandlers(nsInformer, "Namespaces", c.onSystemNamespaceEvent, nil)
}
if c.opts.DiscoveryNamespacesFilter == nil {
c.opts.DiscoveryNamespacesFilter = filter.NewDiscoveryNamespacesFilter(c.nsLister, options.MeshWatcher.Mesh().DiscoverySelectors)
}
c.initDiscoveryHandlers(kubeClient, options.EndpointMode, options.MeshWatcher, c.opts.DiscoveryNamespacesFilter)
c.serviceInformer = filter.NewFilteredSharedIndexInformer(c.opts.DiscoveryNamespacesFilter.Filter, kubeClient.KubeInformer().Core().V1().Services().Informer())
c.serviceLister = listerv1.NewServiceLister(c.serviceInformer.GetIndexer())
c.registerHandlers(c.serviceInformer, "Services", c.onServiceEvent, nil)
switch options.EndpointMode {
case EndpointsOnly:
c.endpoints = newEndpointsController(c)
case EndpointSliceOnly:
c.endpoints = newEndpointSliceController(c)
}
// This is for getting the node IPs of a selected set of nodes
c.nodeInformer = kubeClient.KubeInformer().Core().V1().Nodes().Informer()
c.nodeLister = kubeClient.KubeInformer().Core().V1().Nodes().Lister()
c.registerHandlers(c.nodeInformer, "Nodes", c.onNodeEvent, nil)
podInformer := filter.NewFilteredSharedIndexInformer(c.opts.DiscoveryNamespacesFilter.Filter, kubeClient.KubeInformer().Core().V1().Pods().Informer())
c.pods = newPodCache(c, podInformer, func(key string) {
item, exists, err := c.endpoints.getInformer().GetIndexer().GetByKey(key)
if err != nil {
log.Debugf("Endpoint %v lookup failed with error %v, skipping stale endpoint", key, err)
return
}
if !exists {
log.Debugf("Endpoint %v not found, skipping stale endpoint", key)
return
}
if shouldEnqueue("Pods", c.beginSync) {
c.queue.Push(func() error {
return c.endpoints.onEvent(item, model.EventUpdate)
})
}
})
c.registerHandlers(c.pods.informer, "Pods", c.pods.onEvent, nil)
c.exports = newServiceExportCache(c)
c.imports = newServiceImportCache(c)
return c
}