pilot/pkg/serviceregistry/kube/controller/controller.go (1,003 lines of code) (raw):

// Copyright Istio Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package controller import ( "fmt" "net" "sort" "sync" "time" ) import ( "github.com/hashicorp/go-multierror" "go.uber.org/atomic" "istio.io/api/label" istiolog "istio.io/pkg/log" "istio.io/pkg/monitoring" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" listerv1 "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" ) import ( "github.com/apache/dubbo-go-pixiu/pilot/pkg/features" "github.com/apache/dubbo-go-pixiu/pilot/pkg/model" "github.com/apache/dubbo-go-pixiu/pilot/pkg/serviceregistry" "github.com/apache/dubbo-go-pixiu/pilot/pkg/serviceregistry/aggregate" "github.com/apache/dubbo-go-pixiu/pilot/pkg/serviceregistry/kube" "github.com/apache/dubbo-go-pixiu/pilot/pkg/serviceregistry/kube/controller/filter" "github.com/apache/dubbo-go-pixiu/pilot/pkg/serviceregistry/provider" "github.com/apache/dubbo-go-pixiu/pilot/pkg/serviceregistry/util/workloadinstances" "github.com/apache/dubbo-go-pixiu/pilot/pkg/util/informermetric" "github.com/apache/dubbo-go-pixiu/pkg/cluster" "github.com/apache/dubbo-go-pixiu/pkg/config/host" "github.com/apache/dubbo-go-pixiu/pkg/config/labels" "github.com/apache/dubbo-go-pixiu/pkg/config/mesh" "github.com/apache/dubbo-go-pixiu/pkg/config/protocol" kubelib "github.com/apache/dubbo-go-pixiu/pkg/kube" "github.com/apache/dubbo-go-pixiu/pkg/network" "github.com/apache/dubbo-go-pixiu/pkg/queue" ) const ( // NodeRegionLabel is the well-known label for kubernetes node region in beta NodeRegionLabel = v1.LabelFailureDomainBetaRegion // NodeZoneLabel is the well-known label for kubernetes node zone in beta NodeZoneLabel = v1.LabelFailureDomainBetaZone // NodeRegionLabelGA is the well-known label for kubernetes node region in ga NodeRegionLabelGA = v1.LabelTopologyRegion // NodeZoneLabelGA is the well-known label for kubernetes node zone in ga NodeZoneLabelGA = v1.LabelTopologyZone // DefaultNetworkGatewayPort is the port used by default for cross-network traffic if not otherwise specified // by meshNetworks or "networking.istio.io/gatewayPort" DefaultNetworkGatewayPort = 15443 ) var log = istiolog.RegisterScope("kube", "kubernetes service registry controller", 0) var ( typeTag = monitoring.MustCreateLabel("type") eventTag = monitoring.MustCreateLabel("event") k8sEvents = monitoring.NewSum( "pilot_k8s_reg_events", "Events from k8s registry.", monitoring.WithLabels(typeTag, eventTag), ) // nolint: gocritic // This is deprecated in favor of `pilot_k8s_endpoints_pending_pod`, which is a gauge indicating the number of // currently missing pods. This helps distinguish transient errors from permanent ones endpointsWithNoPods = monitoring.NewSum( "pilot_k8s_endpoints_with_no_pods", "Endpoints that does not have any corresponding pods.") endpointsPendingPodUpdate = monitoring.NewGauge( "pilot_k8s_endpoints_pending_pod", "Number of endpoints that do not currently have any corresponding pods.", ) ) func init() { monitoring.MustRegister(k8sEvents) monitoring.MustRegister(endpointsWithNoPods) monitoring.MustRegister(endpointsPendingPodUpdate) } func incrementEvent(kind, event string) { k8sEvents.With(typeTag.Value(kind), eventTag.Value(event)).Increment() } // Options stores the configurable attributes of a Controller. type Options struct { SystemNamespace string // MeshServiceController is a mesh-wide service Controller. MeshServiceController *aggregate.Controller DomainSuffix string // ClusterID identifies the remote cluster in a multicluster env. ClusterID cluster.ID // ClusterAliases are aliase names for cluster. When a proxy connects with a cluster ID // and if it has a different alias we should use that a cluster ID for proxy. ClusterAliases map[string]string // Metrics for capturing node-based metrics. Metrics model.Metrics // XDSUpdater will push changes to the xDS server. XDSUpdater model.XDSUpdater // NetworksWatcher observes changes to the mesh networks config. NetworksWatcher mesh.NetworksWatcher // MeshWatcher observes changes to the mesh config MeshWatcher mesh.Watcher // EndpointMode decides what source to use to get endpoint information EndpointMode EndpointMode // Maximum QPS when communicating with kubernetes API KubernetesAPIQPS float32 // Maximum burst for throttle when communicating with the kubernetes API KubernetesAPIBurst int // SyncTimeout, if set, causes HasSynced to be returned when marked true. SyncTimeout *atomic.Bool // If meshConfig.DiscoverySelectors are specified, the DiscoveryNamespacesFilter tracks the namespaces this controller watches. DiscoveryNamespacesFilter filter.DiscoveryNamespacesFilter } // DetectEndpointMode determines whether to use Endpoints or EndpointSlice based on the // feature flag and/or Kubernetes version func DetectEndpointMode(kubeClient kubelib.Client) EndpointMode { useEndpointslice, ok := features.EnableEndpointSliceController() // we have a client, and flag wasn't set explicitly, auto-detect if kubeClient != nil && !ok && kubelib.IsAtLeastVersion(kubeClient, 21) { useEndpointslice = true } if useEndpointslice { return EndpointSliceOnly } return EndpointsOnly } // EndpointMode decides what source to use to get endpoint information type EndpointMode int const ( // EndpointsOnly type will use only Kubernetes Endpoints EndpointsOnly EndpointMode = iota // EndpointSliceOnly type will use only Kubernetes EndpointSlices EndpointSliceOnly // TODO: add other modes. Likely want a mode with Endpoints+EndpointSlices that are not controlled by // Kubernetes Controller (e.g. made by user and not duplicated with Endpoints), or a mode with both that // does deduping. Simply doing both won't work for now, since not all Kubernetes components support EndpointSlice. ) var EndpointModes = []EndpointMode{EndpointsOnly, EndpointSliceOnly} var EndpointModeNames = map[EndpointMode]string{ EndpointsOnly: "EndpointsOnly", EndpointSliceOnly: "EndpointSliceOnly", } func (m EndpointMode) String() string { return EndpointModeNames[m] } // kubernetesNode represents a kubernetes node that is reachable externally type kubernetesNode struct { address string labels labels.Instance } // controllerInterface is a simplified interface for the Controller used for testing. type controllerInterface interface { getPodLocality(pod *v1.Pod) string Network(endpointIP string, labels labels.Instance) network.ID Cluster() cluster.ID } var ( _ controllerInterface = &Controller{} _ serviceregistry.Instance = &Controller{} ) // Controller is a collection of synchronized resource watchers // Caches are thread-safe type Controller struct { opts Options client kubelib.Client queue queue.Instance nsInformer cache.SharedIndexInformer nsLister listerv1.NamespaceLister serviceInformer filter.FilteredSharedIndexInformer serviceLister listerv1.ServiceLister endpoints kubeEndpointsController // Used to watch node accessible from remote cluster. // In multi-cluster(shared control plane multi-networks) scenario, ingress gateway service can be of nodePort type. // With this, we can populate mesh's gateway address with the node ips. nodeInformer cache.SharedIndexInformer nodeLister listerv1.NodeLister exports serviceExportCache imports serviceImportCache pods *PodCache handlers model.ControllerHandlers // This is only used for test stop chan struct{} sync.RWMutex // servicesMap stores hostname ==> service, it is used to reduce convertService calls. servicesMap map[host.Name]*model.Service // hostNamesForNamespacedName returns all possible hostnames for the given service name. // If Kubernetes Multi-Cluster Services (MCS) is enabled, this will contain the regular // hostname as well as the MCS hostname (clusterset.local). Otherwise, only the regular // hostname will be returned. hostNamesForNamespacedName func(name types.NamespacedName) []host.Name // servicesForNamespacedName returns all services for the given service name. // If Kubernetes Multi-Cluster Services (MCS) is enabled, this will contain the regular // service as well as the MCS service (clusterset.local), if available. Otherwise, // only the regular service will be returned. servicesForNamespacedName func(name types.NamespacedName) []*model.Service // nodeSelectorsForServices stores hostname => label selectors that can be used to // refine the set of node port IPs for a service. nodeSelectorsForServices map[host.Name]labels.Instance // map of node name and its address+labels - this is the only thing we need from nodes // for vm to k8s or cross cluster. When node port services select specific nodes by labels, // we run through the label selectors here to pick only ones that we need. // Only nodes with ExternalIP addresses are included in this map ! nodeInfoMap map[string]kubernetesNode // externalNameSvcInstanceMap stores hostname ==> instance, is used to store instances for ExternalName k8s services externalNameSvcInstanceMap map[host.Name][]*model.ServiceInstance // index over workload instances from workload entries workloadInstancesIndex workloadinstances.Index multinetwork // informerInit is set to true once the controller is running successfully. This ensures we do not // return HasSynced=true before we are running informerInit *atomic.Bool // beginSync is set to true when calling SyncAll, it indicates the controller has began sync resources. beginSync *atomic.Bool // initialSync is set to true after performing an initial in-order processing of all objects. initialSync *atomic.Bool } // NewController creates a new Kubernetes controller // Created by bootstrap and multicluster (see multicluster.Controller). func NewController(kubeClient kubelib.Client, options Options) *Controller { c := &Controller{ opts: options, client: kubeClient, queue: queue.NewQueueWithID(1*time.Second, string(options.ClusterID)), servicesMap: make(map[host.Name]*model.Service), nodeSelectorsForServices: make(map[host.Name]labels.Instance), nodeInfoMap: make(map[string]kubernetesNode), externalNameSvcInstanceMap: make(map[host.Name][]*model.ServiceInstance), workloadInstancesIndex: workloadinstances.NewIndex(), informerInit: atomic.NewBool(false), beginSync: atomic.NewBool(false), initialSync: atomic.NewBool(false), multinetwork: initMultinetwork(), } if features.EnableMCSHost { c.hostNamesForNamespacedName = func(name types.NamespacedName) []host.Name { return []host.Name{ kube.ServiceHostname(name.Name, name.Namespace, c.opts.DomainSuffix), serviceClusterSetLocalHostname(name), } } c.servicesForNamespacedName = func(name types.NamespacedName) []*model.Service { out := make([]*model.Service, 0, 2) c.RLock() if svc := c.servicesMap[kube.ServiceHostname(name.Name, name.Namespace, c.opts.DomainSuffix)]; svc != nil { out = append(out, svc) } if svc := c.servicesMap[serviceClusterSetLocalHostname(name)]; svc != nil { out = append(out, svc) } c.RUnlock() return out } } else { c.hostNamesForNamespacedName = func(name types.NamespacedName) []host.Name { return []host.Name{ kube.ServiceHostname(name.Name, name.Namespace, c.opts.DomainSuffix), } } c.servicesForNamespacedName = func(name types.NamespacedName) []*model.Service { if svc := c.GetService(kube.ServiceHostname(name.Name, name.Namespace, c.opts.DomainSuffix)); svc != nil { return []*model.Service{svc} } return nil } } c.nsInformer = kubeClient.KubeInformer().Core().V1().Namespaces().Informer() c.nsLister = kubeClient.KubeInformer().Core().V1().Namespaces().Lister() if c.opts.SystemNamespace != "" { nsInformer := filter.NewFilteredSharedIndexInformer(func(obj interface{}) bool { ns, ok := obj.(*v1.Namespace) if !ok { log.Warnf("Namespace watch getting wrong type in event: %T", obj) return false } return ns.Name == c.opts.SystemNamespace }, c.nsInformer) c.registerHandlers(nsInformer, "Namespaces", c.onSystemNamespaceEvent, nil) } if c.opts.DiscoveryNamespacesFilter == nil { c.opts.DiscoveryNamespacesFilter = filter.NewDiscoveryNamespacesFilter(c.nsLister, options.MeshWatcher.Mesh().DiscoverySelectors) } c.initDiscoveryHandlers(kubeClient, options.EndpointMode, options.MeshWatcher, c.opts.DiscoveryNamespacesFilter) c.serviceInformer = filter.NewFilteredSharedIndexInformer(c.opts.DiscoveryNamespacesFilter.Filter, kubeClient.KubeInformer().Core().V1().Services().Informer()) c.serviceLister = listerv1.NewServiceLister(c.serviceInformer.GetIndexer()) c.registerHandlers(c.serviceInformer, "Services", c.onServiceEvent, nil) switch options.EndpointMode { case EndpointsOnly: c.endpoints = newEndpointsController(c) case EndpointSliceOnly: c.endpoints = newEndpointSliceController(c) } // This is for getting the node IPs of a selected set of nodes c.nodeInformer = kubeClient.KubeInformer().Core().V1().Nodes().Informer() c.nodeLister = kubeClient.KubeInformer().Core().V1().Nodes().Lister() c.registerHandlers(c.nodeInformer, "Nodes", c.onNodeEvent, nil) podInformer := filter.NewFilteredSharedIndexInformer(c.opts.DiscoveryNamespacesFilter.Filter, kubeClient.KubeInformer().Core().V1().Pods().Informer()) c.pods = newPodCache(c, podInformer, func(key string) { item, exists, err := c.endpoints.getInformer().GetIndexer().GetByKey(key) if err != nil { log.Debugf("Endpoint %v lookup failed with error %v, skipping stale endpoint", key, err) return } if !exists { log.Debugf("Endpoint %v not found, skipping stale endpoint", key) return } if shouldEnqueue("Pods", c.beginSync) { c.queue.Push(func() error { return c.endpoints.onEvent(item, model.EventUpdate) }) } }) c.registerHandlers(c.pods.informer, "Pods", c.pods.onEvent, nil) c.exports = newServiceExportCache(c) c.imports = newServiceImportCache(c) return c } func (c *Controller) Provider() provider.ID { return provider.Kubernetes } func (c *Controller) Cluster() cluster.ID { return c.opts.ClusterID } func (c *Controller) MCSServices() []model.MCSServiceInfo { outMap := make(map[types.NamespacedName]*model.MCSServiceInfo) // Add the ServiceExport info. for _, se := range c.exports.ExportedServices() { mcsService := outMap[se.namespacedName] if mcsService == nil { mcsService = &model.MCSServiceInfo{} outMap[se.namespacedName] = mcsService } mcsService.Cluster = c.Cluster() mcsService.Name = se.namespacedName.Name mcsService.Namespace = se.namespacedName.Namespace mcsService.Exported = true mcsService.Discoverability = se.discoverability } // Add the ServiceImport info. for _, si := range c.imports.ImportedServices() { mcsService := outMap[si.namespacedName] if mcsService == nil { mcsService = &model.MCSServiceInfo{} outMap[si.namespacedName] = mcsService } mcsService.Cluster = c.Cluster() mcsService.Name = si.namespacedName.Name mcsService.Namespace = si.namespacedName.Namespace mcsService.Imported = true mcsService.ClusterSetVIP = si.clusterSetVIP } out := make([]model.MCSServiceInfo, 0, len(outMap)) for _, v := range outMap { out = append(out, *v) } return out } func (c *Controller) networkFromMeshNetworks(endpointIP string) network.ID { c.RLock() defer c.RUnlock() if c.networkForRegistry != "" { return c.networkForRegistry } if c.ranger != nil { entries, err := c.ranger.ContainingNetworks(net.ParseIP(endpointIP)) if err != nil { log.Error(err) return "" } if len(entries) > 1 { log.Warnf("Found multiple networks CIDRs matching the endpoint IP: %s. Using the first match.", endpointIP) } if len(entries) > 0 { return (entries[0].(namedRangerEntry)).name } } return "" } func (c *Controller) networkFromSystemNamespace() network.ID { c.RLock() defer c.RUnlock() return c.network } func (c *Controller) Network(endpointIP string, labels labels.Instance) network.ID { // 1. check the pod/workloadEntry label if nw := labels[label.TopologyNetwork.Name]; nw != "" { return network.ID(nw) } // 2. check the system namespace labels if nw := c.networkFromSystemNamespace(); nw != "" { return nw } // 3. check the meshNetworks config if nw := c.networkFromMeshNetworks(endpointIP); nw != "" { return nw } return "" } func (c *Controller) Cleanup() error { if err := queue.WaitForClose(c.queue, 30*time.Second); err != nil { log.Warnf("queue for removed kube registry %q may not be done processing: %v", c.Cluster(), err) } if c.opts.XDSUpdater != nil { c.opts.XDSUpdater.RemoveShard(model.ShardKeyFromRegistry(c)) } return nil } func (c *Controller) onServiceEvent(curr interface{}, event model.Event) error { svc, err := convertToService(curr) if err != nil { log.Errorf(err) return nil } log.Debugf("Handle event %s for service %s in namespace %s", event, svc.Name, svc.Namespace) // Create the standard (cluster.local) service. svcConv := kube.ConvertService(*svc, c.opts.DomainSuffix, c.Cluster()) switch event { case model.EventDelete: c.deleteService(svcConv) default: c.addOrUpdateService(svc, svcConv, event, false) } return nil } func (c *Controller) deleteService(svc *model.Service) { c.Lock() delete(c.servicesMap, svc.Hostname) delete(c.nodeSelectorsForServices, svc.Hostname) delete(c.externalNameSvcInstanceMap, svc.Hostname) _, isNetworkGateway := c.networkGatewaysBySvc[svc.Hostname] delete(c.networkGatewaysBySvc, svc.Hostname) c.Unlock() if isNetworkGateway { c.NotifyGatewayHandlers() // TODO trigger push via handler // networks are different, we need to update all eds endpoints c.opts.XDSUpdater.ConfigUpdate(&model.PushRequest{Full: true, Reason: []model.TriggerReason{model.NetworksTrigger}}) } shard := model.ShardKeyFromRegistry(c) event := model.EventDelete c.opts.XDSUpdater.SvcUpdate(shard, string(svc.Hostname), svc.Attributes.Namespace, event) c.handlers.NotifyServiceHandlers(svc, event) } func (c *Controller) addOrUpdateService(svc *v1.Service, svcConv *model.Service, event model.Event, updateEDSCache bool) { needsFullPush := false // First, process nodePort gateway service, whose externalIPs specified // and loadbalancer gateway service if !svcConv.Attributes.ClusterExternalAddresses.IsEmpty() { needsFullPush = c.extractGatewaysFromService(svcConv) } else if isNodePortGatewayService(svc) { // We need to know which services are using node selectors because during node events, // we have to update all the node port services accordingly. nodeSelector := getNodeSelectorsForService(svc) c.Lock() // only add when it is nodePort gateway service c.nodeSelectorsForServices[svcConv.Hostname] = nodeSelector c.Unlock() needsFullPush = c.updateServiceNodePortAddresses(svcConv) } // instance conversion is only required when service is added/updated. instances := kube.ExternalNameServiceInstances(svc, svcConv) c.Lock() c.servicesMap[svcConv.Hostname] = svcConv if len(instances) > 0 { c.externalNameSvcInstanceMap[svcConv.Hostname] = instances } c.Unlock() if needsFullPush { // networks are different, we need to update all eds endpoints c.opts.XDSUpdater.ConfigUpdate(&model.PushRequest{Full: true, Reason: []model.TriggerReason{model.NetworksTrigger}}) } shard := model.ShardKeyFromRegistry(c) ns := svcConv.Attributes.Namespace // We also need to update when the Service changes. For Kubernetes, a service change will result in Endpoint updates, // but workload entries will also need to be updated. // TODO(nmittler): Build different sets of endpoints for cluster.local and clusterset.local. endpoints := c.buildEndpointsForService(svcConv, updateEDSCache) if len(endpoints) > 0 { c.opts.XDSUpdater.EDSCacheUpdate(shard, string(svcConv.Hostname), ns, endpoints) } c.opts.XDSUpdater.SvcUpdate(shard, string(svcConv.Hostname), ns, event) c.handlers.NotifyServiceHandlers(svcConv, event) } func (c *Controller) buildEndpointsForService(svc *model.Service, updateCache bool) []*model.IstioEndpoint { endpoints := c.endpoints.buildIstioEndpointsWithService(svc.Attributes.Name, svc.Attributes.Namespace, svc.Hostname, updateCache) fep := c.collectWorkloadInstanceEndpoints(svc) endpoints = append(endpoints, fep...) return endpoints } func (c *Controller) onNodeEvent(obj interface{}, event model.Event) error { node, ok := obj.(*v1.Node) if !ok { tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { log.Errorf("couldn't get object from tombstone %+v", obj) return nil } node, ok = tombstone.Obj.(*v1.Node) if !ok { log.Errorf("tombstone contained object that is not a node %#v", obj) return nil } } var updatedNeeded bool if event == model.EventDelete { updatedNeeded = true c.Lock() delete(c.nodeInfoMap, node.Name) c.Unlock() } else { k8sNode := kubernetesNode{labels: node.Labels} for _, address := range node.Status.Addresses { if address.Type == v1.NodeExternalIP && address.Address != "" { k8sNode.address = address.Address break } } if k8sNode.address == "" { return nil } c.Lock() // check if the node exists as this add event could be due to controller resync // if the stored object changes, then fire an update event. Otherwise, ignore this event. currentNode, exists := c.nodeInfoMap[node.Name] if !exists || !nodeEquals(currentNode, k8sNode) { c.nodeInfoMap[node.Name] = k8sNode updatedNeeded = true } c.Unlock() } // update all related services if updatedNeeded && c.updateServiceNodePortAddresses() { c.opts.XDSUpdater.ConfigUpdate(&model.PushRequest{ Full: true, Reason: []model.TriggerReason{model.ServiceUpdate}, }) } return nil } // FilterOutFunc func for filtering out objects during update callback type FilterOutFunc func(old, cur interface{}) bool func (c *Controller) registerHandlers( informer filter.FilteredSharedIndexInformer, otype string, handler func(interface{}, model.Event) error, filter FilterOutFunc, ) { wrappedHandler := func(obj interface{}, event model.Event) error { obj = tryGetLatestObject(informer, obj) return handler(obj, event) } if informer, ok := informer.(cache.SharedInformer); ok { _ = informer.SetWatchErrorHandler(informermetric.ErrorHandlerForCluster(c.Cluster())) } informer.AddEventHandler( cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { incrementEvent(otype, "add") if !shouldEnqueue(otype, c.beginSync) { return } c.queue.Push(func() error { return wrappedHandler(obj, model.EventAdd) }) }, UpdateFunc: func(old, cur interface{}) { if filter != nil { if filter(old, cur) { incrementEvent(otype, "updatesame") return } } incrementEvent(otype, "update") if !shouldEnqueue(otype, c.beginSync) { return } c.queue.Push(func() error { return wrappedHandler(cur, model.EventUpdate) }) }, DeleteFunc: func(obj interface{}) { incrementEvent(otype, "delete") if !shouldEnqueue(otype, c.beginSync) { return } c.queue.Push(func() error { return handler(obj, model.EventDelete) }) }, }) } // tryGetLatestObject attempts to fetch the latest version of the object from the cache. // Changes may have occurred between queuing and processing. func tryGetLatestObject(informer filter.FilteredSharedIndexInformer, obj interface{}) interface{} { key, err := cache.MetaNamespaceKeyFunc(obj) if err != nil { log.Warnf("failed creating key for informer object: %v", err) return obj } latest, exists, err := informer.GetIndexer().GetByKey(key) if !exists || err != nil { log.Warnf("couldn't find %q in informer index", key) return obj } return latest } // HasSynced returns true after the initial state synchronization func (c *Controller) HasSynced() bool { return (c.opts.SyncTimeout != nil && c.opts.SyncTimeout.Load()) || c.initialSync.Load() } func (c *Controller) informersSynced() bool { if !c.informerInit.Load() { // registration/Run of informers hasn't occurred yet return false } if (c.nsInformer != nil && !c.nsInformer.HasSynced()) || !c.serviceInformer.HasSynced() || !c.endpoints.HasSynced() || !c.pods.informer.HasSynced() || !c.nodeInformer.HasSynced() || !c.exports.HasSynced() || !c.imports.HasSynced() { return false } return true } // SyncAll syncs all the objects node->service->pod->endpoint in order // TODO: sync same kind of objects in parallel // This can cause great performance cost in multi clusters scenario. // Maybe just sync the cache and trigger one push at last. func (c *Controller) SyncAll() error { c.beginSync.Store(true) var err *multierror.Error err = multierror.Append(err, c.syncDiscoveryNamespaces()) err = multierror.Append(err, c.syncSystemNamespace()) err = multierror.Append(err, c.syncNodes()) err = multierror.Append(err, c.syncServices()) err = multierror.Append(err, c.syncPods()) err = multierror.Append(err, c.syncEndpoints()) return multierror.Flatten(err.ErrorOrNil()) } func (c *Controller) syncSystemNamespace() error { var err error if c.nsLister != nil { sysNs, _ := c.nsLister.Get(c.opts.SystemNamespace) log.Debugf("initializing systemNamespace:%s", c.opts.SystemNamespace) if sysNs != nil { err = c.onSystemNamespaceEvent(sysNs, model.EventAdd) } } return err } func (c *Controller) syncDiscoveryNamespaces() error { var err error if c.nsLister != nil { err = c.opts.DiscoveryNamespacesFilter.SyncNamespaces() } return err } func (c *Controller) syncNodes() error { var err *multierror.Error nodes := c.nodeInformer.GetIndexer().List() log.Debugf("initializing %d nodes", len(nodes)) for _, s := range nodes { err = multierror.Append(err, c.onNodeEvent(s, model.EventAdd)) } return err.ErrorOrNil() } func (c *Controller) syncServices() error { var err *multierror.Error services := c.serviceInformer.GetIndexer().List() log.Debugf("initializing %d services", len(services)) for _, s := range services { err = multierror.Append(err, c.onServiceEvent(s, model.EventAdd)) } return err.ErrorOrNil() } func (c *Controller) syncPods() error { var err *multierror.Error pods := c.pods.informer.GetIndexer().List() log.Debugf("initializing %d pods", len(pods)) for _, s := range pods { err = multierror.Append(err, c.pods.onEvent(s, model.EventAdd)) } return err.ErrorOrNil() } func (c *Controller) syncEndpoints() error { var err *multierror.Error endpoints := c.endpoints.getInformer().GetIndexer().List() log.Debugf("initializing %d endpoints", len(endpoints)) for _, s := range endpoints { err = multierror.Append(err, c.endpoints.onEvent(s, model.EventAdd)) } return err.ErrorOrNil() } // Run all controllers until a signal is received func (c *Controller) Run(stop <-chan struct{}) { st := time.Now() if c.opts.NetworksWatcher != nil { c.opts.NetworksWatcher.AddNetworksHandler(c.reloadNetworkLookup) c.reloadMeshNetworks() c.reloadNetworkGateways() } c.informerInit.Store(true) cache.WaitForCacheSync(stop, c.informersSynced) // after informer caches sync the first time, process resources in order if err := c.SyncAll(); err != nil { log.Errorf("one or more errors force-syncing resources: %v", err) } c.initialSync.Store(true) log.Infof("kube controller for %s synced after %v", c.opts.ClusterID, time.Since(st)) // after the in-order sync we can start processing the queue c.queue.Run(stop) log.Infof("Controller terminated") } // Stop the controller. Only for tests, to simplify the code (defer c.Stop()) func (c *Controller) Stop() { if c.stop != nil { close(c.stop) } } // Services implements a service catalog operation func (c *Controller) Services() []*model.Service { c.RLock() out := make([]*model.Service, 0, len(c.servicesMap)) for _, svc := range c.servicesMap { out = append(out, svc) } c.RUnlock() sort.Slice(out, func(i, j int) bool { return out[i].Hostname < out[j].Hostname }) return out } // GetService implements a service catalog operation by hostname specified. func (c *Controller) GetService(hostname host.Name) *model.Service { c.RLock() svc := c.servicesMap[hostname] c.RUnlock() return svc } // getPodLocality retrieves the locality for a pod. func (c *Controller) getPodLocality(pod *v1.Pod) string { // if pod has `istio-locality` label, skip below ops if len(pod.Labels[model.LocalityLabel]) > 0 { return model.GetLocalityLabelOrDefault(pod.Labels[model.LocalityLabel], "") } // NodeName is set by the scheduler after the pod is created // https://github.com/kubernetes/community/blob/master/contributors/devel/sig-architecture/api-conventions.md#late-initialization raw, err := c.nodeLister.Get(pod.Spec.NodeName) if err != nil { if pod.Spec.NodeName != "" { log.Warnf("unable to get node %q for pod %q/%q: %v", pod.Spec.NodeName, pod.Namespace, pod.Name, err) } return "" } nodeMeta, err := meta.Accessor(raw) if err != nil { log.Warnf("unable to get node meta: %v", nodeMeta) return "" } region := getLabelValue(nodeMeta, NodeRegionLabel, NodeRegionLabelGA) zone := getLabelValue(nodeMeta, NodeZoneLabel, NodeZoneLabelGA) subzone := getLabelValue(nodeMeta, label.TopologySubzone.Name, "") if region == "" && zone == "" && subzone == "" { return "" } return region + "/" + zone + "/" + subzone // Format: "%s/%s/%s" } // InstancesByPort implements a service catalog operation func (c *Controller) InstancesByPort(svc *model.Service, reqSvcPort int, labels labels.Instance) []*model.ServiceInstance { // First get k8s standard service instances and the workload entry instances outInstances := c.endpoints.InstancesByPort(c, svc, reqSvcPort, labels) outInstances = append(outInstances, c.serviceInstancesFromWorkloadInstances(svc, reqSvcPort)...) // return when instances found or an error occurs if len(outInstances) > 0 { return outInstances } // Fall back to external name service since we did not find any instances of normal services c.RLock() externalNameInstances := c.externalNameSvcInstanceMap[svc.Hostname] c.RUnlock() if externalNameInstances != nil { inScopeInstances := make([]*model.ServiceInstance, 0) for _, i := range externalNameInstances { if i.Service.Attributes.Namespace == svc.Attributes.Namespace && i.ServicePort.Port == reqSvcPort { inScopeInstances = append(inScopeInstances, i) } } return inScopeInstances } return nil } func (c *Controller) serviceInstancesFromWorkloadInstances(svc *model.Service, reqSvcPort int) []*model.ServiceInstance { // Run through all the workload instances, select ones that match the service labels // only if this is a kubernetes internal service and of ClientSideLB (eds) type // as InstancesByPort is called by the aggregate controller. We dont want to include // workload instances for any other registry workloadInstancesExist := !c.workloadInstancesIndex.Empty() c.RLock() _, inRegistry := c.servicesMap[svc.Hostname] c.RUnlock() // Only select internal Kubernetes services with selectors if !inRegistry || !workloadInstancesExist || svc.Attributes.ServiceRegistry != provider.Kubernetes || svc.MeshExternal || svc.Resolution != model.ClientSideLB || svc.Attributes.LabelSelectors == nil { return nil } selector := labels.Instance(svc.Attributes.LabelSelectors) // Get the service port name and target port so that we can construct the service instance k8sService, err := c.serviceLister.Services(svc.Attributes.Namespace).Get(svc.Attributes.Name) // We did not find the k8s service. We cannot get the targetPort if err != nil { log.Infof("serviceInstancesFromWorkloadInstances(%s.%s) failed to get k8s service => error %v", svc.Attributes.Name, svc.Attributes.Namespace, err) return nil } var servicePort *model.Port for _, p := range svc.Ports { if p.Port == reqSvcPort { servicePort = p break } } if servicePort == nil { return nil } // Now get the target Port for this service port targetPort := findServiceTargetPort(servicePort, k8sService) if targetPort.num == 0 { targetPort.num = servicePort.Port } out := make([]*model.ServiceInstance, 0) c.workloadInstancesIndex.ForEach(func(wi *model.WorkloadInstance) { if wi.Namespace != svc.Attributes.Namespace { return } if selector.SubsetOf(wi.Endpoint.Labels) { instance := serviceInstanceFromWorkloadInstance(svc, servicePort, targetPort, wi) if instance != nil { out = append(out, instance) } } }) return out } func serviceInstanceFromWorkloadInstance(svc *model.Service, servicePort *model.Port, targetPort serviceTargetPort, wi *model.WorkloadInstance) *model.ServiceInstance { // create an instance with endpoint whose service port name matches istioEndpoint := *wi.Endpoint // by default, use the numbered targetPort istioEndpoint.EndpointPort = uint32(targetPort.num) if targetPort.name != "" { // This is a named port, find the corresponding port in the port map matchedPort := wi.PortMap[targetPort.name] if matchedPort != 0 { istioEndpoint.EndpointPort = matchedPort } else if targetPort.explicitName { // No match found, and we expect the name explicitly in the service, skip this endpoint return nil } } istioEndpoint.ServicePortName = servicePort.Name return &model.ServiceInstance{ Service: svc, ServicePort: servicePort, Endpoint: &istioEndpoint, } } // convenience function to collect all workload entry endpoints in updateEDS calls. func (c *Controller) collectWorkloadInstanceEndpoints(svc *model.Service) []*model.IstioEndpoint { workloadInstancesExist := !c.workloadInstancesIndex.Empty() if !workloadInstancesExist || svc.Resolution != model.ClientSideLB || len(svc.Ports) == 0 { return nil } endpoints := make([]*model.IstioEndpoint, 0) for _, port := range svc.Ports { for _, instance := range c.serviceInstancesFromWorkloadInstances(svc, port.Port) { endpoints = append(endpoints, instance.Endpoint) } } return endpoints } // GetProxyServiceInstances returns service instances co-located with a given proxy // TODO: this code does not return k8s service instances when the proxy's IP is a workload entry // To tackle this, we need a ip2instance map like what we have in service entry. func (c *Controller) GetProxyServiceInstances(proxy *model.Proxy) []*model.ServiceInstance { if len(proxy.IPAddresses) > 0 { proxyIP := proxy.IPAddresses[0] // look up for a WorkloadEntry; if there are multiple WorkloadEntry(s) // with the same IP, choose one deterministically workload := workloadinstances.GetInstanceForProxy(c.workloadInstancesIndex, proxy, proxyIP) if workload != nil { return c.serviceInstancesFromWorkloadInstance(workload) } pod := c.pods.getPodByProxy(proxy) if pod != nil && !proxy.IsVM() { // we don't want to use this block for our test "VM" which is actually a Pod. if !c.isControllerForProxy(proxy) { log.Errorf("proxy is in cluster %v, but controller is for cluster %v", proxy.Metadata.ClusterID, c.Cluster()) return nil } // 1. find proxy service by label selector, if not any, there may exist headless service without selector // failover to 2 if services, err := getPodServices(c.serviceLister, pod); err == nil && len(services) > 0 { out := make([]*model.ServiceInstance, 0) for _, svc := range services { out = append(out, c.getProxyServiceInstancesByPod(pod, svc, proxy)...) } return out } // 2. Headless service without selector return c.endpoints.GetProxyServiceInstances(c, proxy) } // 3. The pod is not present when this is called // due to eventual consistency issues. However, we have a lot of information about the pod from the proxy // metadata already. Because of this, we can still get most of the information we need. // If we cannot accurately construct ServiceInstances from just the metadata, this will return an error and we can // attempt to read the real pod. out, err := c.getProxyServiceInstancesFromMetadata(proxy) if err != nil { log.Warnf("getProxyServiceInstancesFromMetadata for %v failed: %v", proxy.ID, err) } return out } // TODO: This could not happen, remove? if c.opts.Metrics != nil { c.opts.Metrics.AddMetric(model.ProxyStatusNoService, proxy.ID, proxy.ID, "") } else { log.Infof("Missing metrics env, empty list of services for pod %s", proxy.ID) } return nil } func (c *Controller) serviceInstancesFromWorkloadInstance(si *model.WorkloadInstance) []*model.ServiceInstance { out := make([]*model.ServiceInstance, 0) // find the workload entry's service by label selector // rather than scanning through our internal map of model.services, get the services via the k8s apis dummyPod := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{Namespace: si.Namespace, Labels: si.Endpoint.Labels}, } // find the services that map to this workload entry, fire off eds updates if the service is of type client-side lb if k8sServices, err := getPodServices(c.serviceLister, dummyPod); err == nil && len(k8sServices) > 0 { for _, k8sSvc := range k8sServices { service := c.GetService(kube.ServiceHostname(k8sSvc.Name, k8sSvc.Namespace, c.opts.DomainSuffix)) // Note that this cannot be an external service because k8s external services do not have label selectors. if service == nil || service.Resolution != model.ClientSideLB { // may be a headless service continue } for _, servicePort := range service.Ports { if servicePort.Protocol == protocol.UDP { continue } // Now get the target Port for this service port targetPort := findServiceTargetPort(servicePort, k8sSvc) if targetPort.num == 0 { targetPort.num = servicePort.Port } instance := serviceInstanceFromWorkloadInstance(service, servicePort, targetPort, si) if instance != nil { out = append(out, instance) } } } } return out } // WorkloadInstanceHandler defines the handler for service instances generated by other registries func (c *Controller) WorkloadInstanceHandler(si *model.WorkloadInstance, event model.Event) { // ignore malformed workload entries. And ignore any workload entry that does not have a label // as there is no way for us to select them if si.Namespace == "" || len(si.Endpoint.Labels) == 0 { return } // this is from a workload entry. Store it in separate index so that // the InstancesByPort can use these as well as the k8s pods. switch event { case model.EventDelete: c.workloadInstancesIndex.Delete(si) default: // add or update c.workloadInstancesIndex.Insert(si) } // find the workload entry's service by label selector // rather than scanning through our internal map of model.services, get the services via the k8s apis dummyPod := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{Namespace: si.Namespace, Labels: si.Endpoint.Labels}, } shard := model.ShardKeyFromRegistry(c) // find the services that map to this workload entry, fire off eds updates if the service is of type client-side lb if k8sServices, err := getPodServices(c.serviceLister, dummyPod); err == nil && len(k8sServices) > 0 { for _, k8sSvc := range k8sServices { service := c.GetService(kube.ServiceHostname(k8sSvc.Name, k8sSvc.Namespace, c.opts.DomainSuffix)) // Note that this cannot be an external service because k8s external services do not have label selectors. if service == nil || service.Resolution != model.ClientSideLB { // may be a headless service continue } // Get the updated list of endpoints that includes k8s pods and the workload entries for this service // and then notify the EDS server that endpoints for this service have changed. // We need one endpoint object for each service port endpoints := make([]*model.IstioEndpoint, 0) for _, port := range service.Ports { if port.Protocol == protocol.UDP { continue } // Similar code as UpdateServiceShards in eds.go instances := c.InstancesByPort(service, port.Port, nil) for _, inst := range instances { endpoints = append(endpoints, inst.Endpoint) } } // fire off eds update c.opts.XDSUpdater.EDSUpdate(shard, string(service.Hostname), service.Attributes.Namespace, endpoints) } } } func (c *Controller) onSystemNamespaceEvent(obj interface{}, ev model.Event) error { if ev == model.EventDelete { return nil } ns, ok := obj.(*v1.Namespace) if !ok { log.Warnf("Namespace watch getting wrong type in event: %T", obj) return nil } if ns == nil { return nil } nw := ns.Labels[label.TopologyNetwork.Name] c.Lock() oldDefaultNetwork := c.network c.network = network.ID(nw) c.Unlock() // network changed, rarely happen if oldDefaultNetwork != c.network { // refresh pods/endpoints/services c.onDefaultNetworkChange() } return nil } // isControllerForProxy should be used for proxies assumed to be in the kube cluster for this controller. Workload Entries // may not necessarily pass this check, but we still want to allow kube services to select workload instances. func (c *Controller) isControllerForProxy(proxy *model.Proxy) bool { return proxy.Metadata.ClusterID == "" || proxy.Metadata.ClusterID == c.Cluster() } // getProxyServiceInstancesFromMetadata retrieves ServiceInstances using proxy Metadata rather than // from the Pod. This allows retrieving Instances immediately, regardless of delays in Kubernetes. // If the proxy doesn't have enough metadata, an error is returned func (c *Controller) getProxyServiceInstancesFromMetadata(proxy *model.Proxy) ([]*model.ServiceInstance, error) { if len(proxy.Metadata.Labels) == 0 { return nil, nil } if !c.isControllerForProxy(proxy) { return nil, fmt.Errorf("proxy is in cluster %v, but controller is for cluster %v", proxy.Metadata.ClusterID, c.Cluster()) } // Create a pod with just the information needed to find the associated Services dummyPod := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Namespace: proxy.ConfigNamespace, Labels: proxy.Metadata.Labels, }, } // Find the Service associated with the pod. services, err := getPodServices(c.serviceLister, dummyPod) if err != nil { return nil, fmt.Errorf("error getting instances for %s: %v", proxy.ID, err) } if len(services) == 0 { return nil, fmt.Errorf("no instances found for %s: %v", proxy.ID, err) } out := make([]*model.ServiceInstance, 0) for _, svc := range services { hostname := kube.ServiceHostname(svc.Name, svc.Namespace, c.opts.DomainSuffix) modelService := c.GetService(hostname) if modelService == nil { return nil, fmt.Errorf("failed to find model service for %v", hostname) } for _, modelService := range c.servicesForNamespacedName(kube.NamespacedNameForK8sObject(svc)) { discoverabilityPolicy := c.exports.EndpointDiscoverabilityPolicy(modelService) tps := make(map[model.Port]*model.Port) tpsList := make([]model.Port, 0) for _, port := range svc.Spec.Ports { svcPort, f := modelService.Ports.Get(port.Name) if !f { return nil, fmt.Errorf("failed to get svc port for %v", port.Name) } var portNum int if len(proxy.Metadata.PodPorts) > 0 { portNum, err = findPortFromMetadata(port, proxy.Metadata.PodPorts) if err != nil { return nil, fmt.Errorf("failed to find target port for %v: %v", proxy.ID, err) } } else { // most likely a VM - we assume the WorkloadEntry won't remap any ports portNum = port.TargetPort.IntValue() } // Dedupe the target ports here - Service might have configured multiple ports to the same target port, // we will have to create only one ingress listener per port and protocol so that we do not endup // complaining about listener conflicts. targetPort := model.Port{ Port: portNum, Protocol: svcPort.Protocol, } if _, exists := tps[targetPort]; !exists { tps[targetPort] = svcPort tpsList = append(tpsList, targetPort) } } epBuilder := NewEndpointBuilderFromMetadata(c, proxy) // Iterate over target ports in the same order as defined in service spec, in case of // protocol conflict for a port causes unstable protocol selection for a port. for _, tp := range tpsList { svcPort := tps[tp] // consider multiple IP scenarios for _, ip := range proxy.IPAddresses { // Construct the ServiceInstance out = append(out, &model.ServiceInstance{ Service: modelService, ServicePort: svcPort, Endpoint: epBuilder.buildIstioEndpoint(ip, int32(tp.Port), svcPort.Name, discoverabilityPolicy), }) } } } } return out, nil } func (c *Controller) getProxyServiceInstancesByPod(pod *v1.Pod, service *v1.Service, proxy *model.Proxy) []*model.ServiceInstance { var out []*model.ServiceInstance for _, svc := range c.servicesForNamespacedName(kube.NamespacedNameForK8sObject(service)) { discoverabilityPolicy := c.exports.EndpointDiscoverabilityPolicy(svc) tps := make(map[model.Port]*model.Port) tpsList := make([]model.Port, 0) for _, port := range service.Spec.Ports { svcPort, exists := svc.Ports.Get(port.Name) if !exists { continue } // find target port portNum, err := FindPort(pod, &port) if err != nil { log.Warnf("Failed to find port for service %s/%s: %v", service.Namespace, service.Name, err) continue } // Dedupe the target ports here - Service might have configured multiple ports to the same target port, // we will have to create only one ingress listener per port and protocol so that we do not endup // complaining about listener conflicts. targetPort := model.Port{ Port: portNum, Protocol: svcPort.Protocol, } if _, exists := tps[targetPort]; !exists { tps[targetPort] = svcPort tpsList = append(tpsList, targetPort) } } builder := NewEndpointBuilder(c, pod) // Iterate over target ports in the same order as defined in service spec, in case of // protocol conflict for a port causes unstable protocol selection for a port. for _, tp := range tpsList { svcPort := tps[tp] // consider multiple IP scenarios for _, ip := range proxy.IPAddresses { istioEndpoint := builder.buildIstioEndpoint(ip, int32(tp.Port), svcPort.Name, discoverabilityPolicy) out = append(out, &model.ServiceInstance{ Service: svc, ServicePort: svcPort, Endpoint: istioEndpoint, }) } } } return out } func (c *Controller) GetProxyWorkloadLabels(proxy *model.Proxy) labels.Instance { pod := c.pods.getPodByProxy(proxy) if pod != nil { return pod.Labels } return nil } // GetIstioServiceAccounts returns the Istio service accounts running a service // hostname. Each service account is encoded according to the SPIFFE VSID spec. // For example, a service account named "bar" in namespace "foo" is encoded as // "spiffe://cluster.local/ns/foo/sa/bar". func (c *Controller) GetIstioServiceAccounts(svc *model.Service, ports []int) []string { return model.GetServiceAccounts(svc, ports, c) } // AppendServiceHandler implements a service catalog operation func (c *Controller) AppendServiceHandler(f func(*model.Service, model.Event)) { c.handlers.AppendServiceHandler(f) } // AppendWorkloadHandler implements a service catalog operation func (c *Controller) AppendWorkloadHandler(f func(*model.WorkloadInstance, model.Event)) { c.handlers.AppendWorkloadHandler(f) }