pilot/pkg/serviceregistry/aggregate/controller.go (312 lines of code) (raw):

// Copyright Istio Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package aggregate import ( "sort" "sync" ) import ( "istio.io/pkg/log" ) import ( "github.com/apache/dubbo-go-pixiu/pilot/pkg/model" "github.com/apache/dubbo-go-pixiu/pilot/pkg/serviceregistry" "github.com/apache/dubbo-go-pixiu/pilot/pkg/serviceregistry/provider" "github.com/apache/dubbo-go-pixiu/pkg/cluster" "github.com/apache/dubbo-go-pixiu/pkg/config/host" "github.com/apache/dubbo-go-pixiu/pkg/config/labels" "github.com/apache/dubbo-go-pixiu/pkg/config/mesh" "github.com/apache/dubbo-go-pixiu/pkg/spiffe" ) // The aggregate controller does not implement serviceregistry.Instance since it may be comprised of various // providers and clusters. var ( _ model.ServiceDiscovery = &Controller{} _ model.AggregateController = &Controller{} ) // Controller aggregates data across different registries and monitors for changes type Controller struct { meshHolder mesh.Holder // The lock is used to protect the registries and controller's running status. storeLock sync.RWMutex registries []*registryEntry // indicates whether the controller has run. // if true, all the registries added later should be run manually. running bool handlers model.ControllerHandlers handlersByCluster map[cluster.ID]*model.ControllerHandlers model.NetworkGatewaysHandler } type registryEntry struct { serviceregistry.Instance // stop if not nil is the per-registry stop chan. If null, the server stop chan should be used to Run the registry. stop <-chan struct{} } type Options struct { MeshHolder mesh.Holder } // NewController creates a new Aggregate controller func NewController(opt Options) *Controller { return &Controller{ registries: make([]*registryEntry, 0), meshHolder: opt.MeshHolder, running: false, handlersByCluster: map[cluster.ID]*model.ControllerHandlers{}, } } func (c *Controller) addRegistry(registry serviceregistry.Instance, stop <-chan struct{}) { c.registries = append(c.registries, &registryEntry{Instance: registry, stop: stop}) // Observe the registry for events. registry.AppendNetworkGatewayHandler(c.NotifyGatewayHandlers) registry.AppendServiceHandler(c.handlers.NotifyServiceHandlers) registry.AppendServiceHandler(func(service *model.Service, event model.Event) { for _, handlers := range c.getClusterHandlers() { handlers.NotifyServiceHandlers(service, event) } }) } func (c *Controller) getClusterHandlers() []*model.ControllerHandlers { c.storeLock.Lock() defer c.storeLock.Unlock() out := make([]*model.ControllerHandlers, 0, len(c.handlersByCluster)) for _, handlers := range c.handlersByCluster { out = append(out, handlers) } return out } // AddRegistry adds registries into the aggregated controller. // If the aggregated controller is already Running, the given registry will never be started. func (c *Controller) AddRegistry(registry serviceregistry.Instance) { c.storeLock.Lock() defer c.storeLock.Unlock() c.addRegistry(registry, nil) } // AddRegistryAndRun adds registries into the aggregated controller and makes sure it is Run. // If the aggregated controller is running, the given registry is Run immediately. // Otherwise, the given registry is Run when the aggregate controller is Run, using the given stop. func (c *Controller) AddRegistryAndRun(registry serviceregistry.Instance, stop <-chan struct{}) { if stop == nil { log.Warnf("nil stop channel passed to AddRegistryAndRun for registry %s/%s", registry.Provider(), registry.Cluster()) } c.storeLock.Lock() defer c.storeLock.Unlock() c.addRegistry(registry, stop) if c.running { go registry.Run(stop) } } // DeleteRegistry deletes specified registry from the aggregated controller func (c *Controller) DeleteRegistry(clusterID cluster.ID, providerID provider.ID) { c.storeLock.Lock() defer c.storeLock.Unlock() if len(c.registries) == 0 { log.Warnf("Registry list is empty, nothing to delete") return } index, ok := c.getRegistryIndex(clusterID, providerID) if !ok { log.Warnf("Registry %s/%s is not found in the registries list, nothing to delete", providerID, clusterID) return } c.registries[index] = nil c.registries = append(c.registries[:index], c.registries[index+1:]...) log.Infof("%s registry for the cluster %s has been deleted.", providerID, clusterID) } // GetRegistries returns a copy of all registries func (c *Controller) GetRegistries() []serviceregistry.Instance { c.storeLock.RLock() defer c.storeLock.RUnlock() // copy registries to prevent race, no need to deep copy here. out := make([]serviceregistry.Instance, len(c.registries)) for i := range c.registries { out[i] = c.registries[i] } return out } func (c *Controller) getRegistryIndex(clusterID cluster.ID, provider provider.ID) (int, bool) { for i, r := range c.registries { if r.Cluster().Equals(clusterID) && r.Provider() == provider { return i, true } } return 0, false } // Services lists services from all platforms func (c *Controller) Services() []*model.Service { // smap is a map of hostname (string) to service index, used to identify services that // are installed in multiple clusters. smap := make(map[host.Name]int) index := 0 services := make([]*model.Service, 0) // Locking Registries list while walking it to prevent inconsistent results for _, r := range c.GetRegistries() { svcs := r.Services() if r.Provider() != provider.Kubernetes { index += len(svcs) services = append(services, svcs...) } else { for _, s := range svcs { previous, ok := smap[s.Hostname] if !ok { // First time we see a service. The result will have a single service per hostname // The first cluster will be listed first, so the services in the primary cluster // will be used for default settings. If a service appears in multiple clusters, // the order is less clear. smap[s.Hostname] = index index++ services = append(services, s) } else { // We must deepcopy before merge, and after merging, the ClusterVips length will be >= 2. // This is an optimization to prevent deepcopy multi-times if len(services[previous].ClusterVIPs.GetAddresses()) < 2 { // Deep copy before merging, otherwise there is a case // a service in remote cluster can be deleted, but the ClusterIP left. services[previous] = services[previous].DeepCopy() } // If it is seen second time, that means it is from a different cluster, update cluster VIPs. mergeService(services[previous], s, r) } } } } return services } // GetService retrieves a service by hostname if exists func (c *Controller) GetService(hostname host.Name) *model.Service { var out *model.Service for _, r := range c.GetRegistries() { service := r.GetService(hostname) if service == nil { continue } if r.Provider() != provider.Kubernetes { return service } if out == nil { out = service.DeepCopy() } else { // If we are seeing the service for the second time, it means it is available in multiple clusters. mergeService(out, service, r) } } return out } func mergeService(dst, src *model.Service, srcRegistry serviceregistry.Instance) { // Prefer the k8s HostVIPs where possible clusterID := srcRegistry.Cluster() if srcRegistry.Provider() == provider.Kubernetes || len(dst.ClusterVIPs.GetAddressesFor(clusterID)) == 0 { newAddresses := src.ClusterVIPs.GetAddressesFor(clusterID) dst.ClusterVIPs.SetAddressesFor(clusterID, newAddresses) } } // NetworkGateways merges the service-based cross-network gateways from each registry. func (c *Controller) NetworkGateways() []model.NetworkGateway { var gws []model.NetworkGateway for _, r := range c.GetRegistries() { gws = append(gws, r.NetworkGateways()...) } return gws } func (c *Controller) MCSServices() []model.MCSServiceInfo { var out []model.MCSServiceInfo for _, r := range c.GetRegistries() { out = append(out, r.MCSServices()...) } return out } // InstancesByPort retrieves instances for a service on a given port that match // any of the supplied labels. All instances match an empty label list. func (c *Controller) InstancesByPort(svc *model.Service, port int, labels labels.Instance) []*model.ServiceInstance { var instances []*model.ServiceInstance for _, r := range c.GetRegistries() { instances = append(instances, r.InstancesByPort(svc, port, labels)...) } return instances } func nodeClusterID(node *model.Proxy) cluster.ID { if node.Metadata == nil || node.Metadata.ClusterID == "" { return "" } return node.Metadata.ClusterID } // Skip the service registry when there won't be a match // because the proxy is in a different cluster. func skipSearchingRegistryForProxy(nodeClusterID cluster.ID, r serviceregistry.Instance) bool { // Always search non-kube (usually serviceentry) registry. // Check every registry if cluster ID isn't specified. if r.Provider() != provider.Kubernetes || nodeClusterID == "" { return false } return !r.Cluster().Equals(nodeClusterID) } // GetProxyServiceInstances lists service instances co-located with a given proxy func (c *Controller) GetProxyServiceInstances(node *model.Proxy) []*model.ServiceInstance { out := make([]*model.ServiceInstance, 0) nodeClusterID := nodeClusterID(node) for _, r := range c.GetRegistries() { if skipSearchingRegistryForProxy(nodeClusterID, r) { log.Debugf("GetProxyServiceInstances(): not searching registry %v: proxy %v CLUSTER_ID is %v", r.Cluster(), node.ID, nodeClusterID) continue } instances := r.GetProxyServiceInstances(node) if len(instances) > 0 { out = append(out, instances...) } } return out } func (c *Controller) GetProxyWorkloadLabels(proxy *model.Proxy) labels.Instance { clusterID := nodeClusterID(proxy) for _, r := range c.GetRegistries() { // If proxy clusterID unset, we may find incorrect workload label. // This can not happen in k8s env. if clusterID == "" { lbls := r.GetProxyWorkloadLabels(proxy) if lbls != nil { return lbls } } else if clusterID == r.Cluster() { // find proxy in the specified cluster lbls := r.GetProxyWorkloadLabels(proxy) if lbls != nil { return lbls } } } return nil } // Run starts all the controllers func (c *Controller) Run(stop <-chan struct{}) { c.storeLock.Lock() for _, r := range c.registries { // prefer the per-registry stop channel registryStop := stop if s := r.stop; s != nil { registryStop = s } go r.Run(registryStop) } c.running = true c.storeLock.Unlock() <-stop log.Info("Registry Aggregator terminated") } // HasSynced returns true when all registries have synced func (c *Controller) HasSynced() bool { for _, r := range c.GetRegistries() { if !r.HasSynced() { log.Debugf("registry %s is syncing", r.Cluster()) return false } } return true } func (c *Controller) AppendServiceHandler(f func(*model.Service, model.Event)) { c.handlers.AppendServiceHandler(f) } func (c *Controller) AppendWorkloadHandler(f func(*model.WorkloadInstance, model.Event)) { // Currently, it is not used. // Note: take care when you want to enable it, it will register the handlers to all registries // c.handlers.AppendWorkloadHandler(f) } func (c *Controller) AppendServiceHandlerForCluster(id cluster.ID, f func(*model.Service, model.Event)) { c.storeLock.Lock() defer c.storeLock.Unlock() handler, ok := c.handlersByCluster[id] if !ok { c.handlersByCluster[id] = &model.ControllerHandlers{} handler = c.handlersByCluster[id] } handler.AppendServiceHandler(f) } func (c *Controller) AppendWorkloadHandlerForCluster(id cluster.ID, f func(*model.WorkloadInstance, model.Event)) { c.storeLock.Lock() defer c.storeLock.Unlock() handler, ok := c.handlersByCluster[id] if !ok { c.handlersByCluster[id] = &model.ControllerHandlers{} handler = c.handlersByCluster[id] } handler.AppendWorkloadHandler(f) } func (c *Controller) UnRegisterHandlersForCluster(id cluster.ID) { c.storeLock.Lock() defer c.storeLock.Unlock() delete(c.handlersByCluster, id) } // GetIstioServiceAccounts implements model.ServiceAccounts operation. // The returned list contains all SPIFFE based identities that backs the service. // This method also expand the results from different registries based on the mesh config trust domain aliases. // To retain such trust domain expansion behavior, the xDS server implementation should wrap any (even if single) // service registry by this aggreated one. // For example, // - { "spiffe://cluster.local/bar@iam.gserviceaccount.com"}; when annotation is used on corresponding workloads. // - { "spiffe://cluster.local/ns/default/sa/foo" }; normal kubernetes cases // - { "spiffe://cluster.local/ns/default/sa/foo", "spiffe://trust-domain-alias/ns/default/sa/foo" }; // if the trust domain alias is configured. func (c *Controller) GetIstioServiceAccounts(svc *model.Service, ports []int) []string { out := map[string]struct{}{} for _, r := range c.GetRegistries() { svcAccounts := r.GetIstioServiceAccounts(svc, ports) for _, sa := range svcAccounts { out[sa] = struct{}{} } } result := make([]string, 0, len(out)) for k := range out { result = append(result, k) } tds := make([]string, 0) if c.meshHolder != nil { m := c.meshHolder.Mesh() if m != nil { tds = m.TrustDomainAliases } } expanded := spiffe.ExpandWithTrustDomains(result, tds) result = make([]string, 0, len(expanded)) for k := range expanded { result = append(result, k) } // Sort to make the return result deterministic. sort.Strings(result) return result }