pilot/pkg/serviceregistry/kube/controller/endpointslice.go (436 lines of code) (raw):

// Copyright Istio Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package controller import ( "sync" ) import ( "istio.io/api/label" v1 "k8s.io/api/discovery/v1" "k8s.io/api/discovery/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" klabels "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/selection" "k8s.io/apimachinery/pkg/types" listerv1 "k8s.io/client-go/listers/discovery/v1" listerv1beta1 "k8s.io/client-go/listers/discovery/v1beta1" "k8s.io/client-go/tools/cache" mcs "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" ) import ( "github.com/apache/dubbo-go-pixiu/pilot/pkg/features" "github.com/apache/dubbo-go-pixiu/pilot/pkg/model" "github.com/apache/dubbo-go-pixiu/pilot/pkg/serviceregistry/kube" "github.com/apache/dubbo-go-pixiu/pilot/pkg/serviceregistry/kube/controller/filter" "github.com/apache/dubbo-go-pixiu/pkg/config/host" "github.com/apache/dubbo-go-pixiu/pkg/config/labels" kubelib "github.com/apache/dubbo-go-pixiu/pkg/kube" ) type endpointSliceController struct { kubeEndpoints endpointCache *endpointSliceCache useV1Resource bool } var _ kubeEndpointsController = &endpointSliceController{} var ( endpointSliceRequirement = labelRequirement(mcs.LabelServiceName, selection.DoesNotExist, nil) endpointSliceSelector = klabels.NewSelector().Add(*endpointSliceRequirement) ) func newEndpointSliceController(c *Controller) *endpointSliceController { // TODO Endpoints has a special cache, to filter out irrelevant updates to kube-system // Investigate if we need this, or if EndpointSlice is makes this not relevant useV1Resource := endpointSliceV1Available(c.client) var informer cache.SharedIndexInformer if useV1Resource { informer = c.client.KubeInformer().Discovery().V1().EndpointSlices().Informer() } else { informer = c.client.KubeInformer().Discovery().V1beta1().EndpointSlices().Informer() } filteredInformer := filter.NewFilteredSharedIndexInformer( c.opts.DiscoveryNamespacesFilter.Filter, informer, ) out := &endpointSliceController{ kubeEndpoints: kubeEndpoints{ c: c, informer: filteredInformer, }, useV1Resource: useV1Resource, endpointCache: newEndpointSliceCache(), } c.registerHandlers(filteredInformer, "EndpointSlice", out.onEvent, nil) return out } // TODO use this to automatically switch to EndpointSlice mode func endpointSliceV1Available(client kubelib.Client) bool { return client != nil && kubelib.IsAtLeastVersion(client, 21) } func (esc *endpointSliceController) getInformer() filter.FilteredSharedIndexInformer { return esc.informer } func (esc *endpointSliceController) listSlices(ns string, selector klabels.Selector) (slices []interface{}, err error) { if esc.useV1Resource { var eps []*v1.EndpointSlice eps, err = listerv1.NewEndpointSliceLister(esc.informer.GetIndexer()).EndpointSlices(ns).List(selector) slices = make([]interface{}, len(eps)) for i, ep := range eps { slices[i] = ep } } else { var eps []*v1beta1.EndpointSlice eps, err = listerv1beta1.NewEndpointSliceLister(esc.informer.GetIndexer()).EndpointSlices(ns).List(selector) slices = make([]interface{}, len(eps)) for i, ep := range eps { slices[i] = ep } } return } func (esc *endpointSliceController) onEvent(curr interface{}, event model.Event) error { ep, ok := curr.(metav1.Object) if !ok { tombstone, ok := curr.(cache.DeletedFinalStateUnknown) if !ok { log.Errorf("Couldn't get object from tombstone %#v", curr) return nil } epGvk, ok := tombstone.Obj.(runtime.Object) if !ok || epGvk.GetObjectKind().GroupVersionKind().Kind != "EndpointSlice" { log.Errorf("Tombstone contained an object that is not an endpoints slice %#v", curr) return nil } } esLabels := ep.GetLabels() if endpointSliceSelector.Matches(klabels.Set(esLabels)) { return processEndpointEvent(esc.c, esc, serviceNameForEndpointSlice(esLabels), ep.GetNamespace(), event, ep) } return nil } // GetProxyServiceInstances returns service instances co-located with a given proxy // TODO: this code does not return k8s service instances when the proxy's IP is a workload entry // To tackle this, we need a ip2instance map like what we have in service entry. func (esc *endpointSliceController) GetProxyServiceInstances(c *Controller, proxy *model.Proxy) []*model.ServiceInstance { eps, err := esc.listSlices(proxy.Metadata.Namespace, endpointSliceSelector) if err != nil { log.Errorf("Get endpointslice by index failed: %v", err) return nil } var out []*model.ServiceInstance for _, ep := range eps { instances := esc.sliceServiceInstances(c, ep, proxy) out = append(out, instances...) } return out } func serviceNameForEndpointSlice(labels map[string]string) string { return labels[v1beta1.LabelServiceName] } func (esc *endpointSliceController) sliceServiceInstances(c *Controller, slice interface{}, proxy *model.Proxy) []*model.ServiceInstance { var out []*model.ServiceInstance ep := wrapEndpointSlice(slice) if ep.AddressType() == v1.AddressTypeFQDN { // TODO(https://github.com/istio/istio/issues/34995) support FQDN endpointslice return out } for _, svc := range c.servicesForNamespacedName(esc.getServiceNamespacedName(ep)) { pod := c.pods.getPodByProxy(proxy) builder := NewEndpointBuilder(c, pod) discoverabilityPolicy := c.exports.EndpointDiscoverabilityPolicy(svc) for _, port := range ep.Ports() { if port.Name == nil || port.Port == nil { continue } svcPort, exists := svc.Ports.Get(*port.Name) if !exists { continue } // consider multiple IP scenarios for _, ip := range proxy.IPAddresses { for _, ep := range ep.Endpoints() { for _, a := range ep.Addresses { if a == ip { istioEndpoint := builder.buildIstioEndpoint(ip, *port.Port, svcPort.Name, discoverabilityPolicy) out = append(out, &model.ServiceInstance{ Endpoint: istioEndpoint, ServicePort: svcPort, Service: svc, }) // If the endpoint isn't ready, report this if ep.Conditions.Ready != nil && !*ep.Conditions.Ready && c.opts.Metrics != nil { c.opts.Metrics.AddMetric(model.ProxyStatusEndpointNotReady, proxy.ID, proxy.ID, "") } } } } } } } return out } func (esc *endpointSliceController) forgetEndpoint(endpoint interface{}) map[host.Name][]*model.IstioEndpoint { slice := wrapEndpointSlice(endpoint) key := kube.KeyFunc(slice.Name, slice.Namespace) for _, e := range slice.Endpoints() { for _, a := range e.Addresses { esc.c.pods.endpointDeleted(key, a) } } out := make(map[host.Name][]*model.IstioEndpoint) for _, hostName := range esc.c.hostNamesForNamespacedName(esc.getServiceNamespacedName(slice)) { // endpointSlice cache update if esc.endpointCache.Has(hostName) { esc.endpointCache.Delete(hostName, slice.Name) out[hostName] = esc.endpointCache.Get(hostName) } } return out } func (esc *endpointSliceController) buildIstioEndpoints(es interface{}, hostName host.Name) []*model.IstioEndpoint { esc.updateEndpointCacheForSlice(hostName, es) return esc.endpointCache.Get(hostName) } func (esc *endpointSliceController) updateEndpointCacheForSlice(hostName host.Name, ep interface{}) { var endpoints []*model.IstioEndpoint slice := wrapEndpointSlice(ep) if slice.AddressType() == v1.AddressTypeFQDN { // TODO(https://github.com/istio/istio/issues/34995) support FQDN endpointslice return } discoverabilityPolicy := esc.c.exports.EndpointDiscoverabilityPolicy(esc.c.GetService(hostName)) for _, e := range slice.Endpoints() { if !features.SendUnhealthyEndpoints { if e.Conditions.Ready != nil && !*e.Conditions.Ready { // Ignore not ready endpoints continue } } ready := e.Conditions.Ready == nil || *e.Conditions.Ready for _, a := range e.Addresses { pod, expectedPod := getPod(esc.c, a, &metav1.ObjectMeta{Name: slice.Name, Namespace: slice.Namespace}, e.TargetRef, hostName) if pod == nil && expectedPod { continue } builder := NewEndpointBuilder(esc.c, pod) // EDS and ServiceEntry use name for service port - ADS will need to map to numbers. for _, port := range slice.Ports() { var portNum int32 if port.Port != nil { portNum = *port.Port } var portName string if port.Name != nil { portName = *port.Name } istioEndpoint := builder.buildIstioEndpoint(a, portNum, portName, discoverabilityPolicy) if ready { istioEndpoint.HealthStatus = model.Healthy } else { istioEndpoint.HealthStatus = model.UnHealthy } endpoints = append(endpoints, istioEndpoint) } } } esc.endpointCache.Update(hostName, slice.Name, endpoints) } func (esc *endpointSliceController) buildIstioEndpointsWithService(name, namespace string, hostName host.Name, updateCache bool) []*model.IstioEndpoint { esLabelSelector := endpointSliceSelectorForService(name) slices, err := esc.listSlices(namespace, esLabelSelector) if err != nil || len(slices) == 0 { log.Debugf("endpoint slices of (%s, %s) not found => error %v", name, namespace, err) return nil } if updateCache { // A cache update was requested. Rebuild the endpoints for these slices. for _, slice := range slices { esc.updateEndpointCacheForSlice(hostName, slice) } } return esc.endpointCache.Get(hostName) } func (esc *endpointSliceController) getServiceNamespacedName(es interface{}) types.NamespacedName { slice := es.(metav1.Object) return types.NamespacedName{ Namespace: slice.GetNamespace(), Name: serviceNameForEndpointSlice(slice.GetLabels()), } } func (esc *endpointSliceController) InstancesByPort(c *Controller, svc *model.Service, reqSvcPort int, lbls labels.Instance) []*model.ServiceInstance { esLabelSelector := endpointSliceSelectorForService(svc.Attributes.Name) slices, err := esc.listSlices(svc.Attributes.Namespace, esLabelSelector) if err != nil { log.Infof("get endpoints(%s, %s) => error %v", svc.Attributes.Name, svc.Attributes.Namespace, err) return nil } if len(slices) == 0 { return nil } // Locate all ports in the actual service svcPort, exists := svc.Ports.GetByPort(reqSvcPort) if !exists { return nil } discoverabilityPolicy := c.exports.EndpointDiscoverabilityPolicy(svc) var out []*model.ServiceInstance for _, es := range slices { slice := wrapEndpointSlice(es) if slice.AddressType() == v1.AddressTypeFQDN { // TODO(https://github.com/istio/istio/issues/34995) support FQDN endpointslice continue } for _, e := range slice.Endpoints() { for _, a := range e.Addresses { var podLabels labels.Instance pod, expectedPod := getPod(c, a, &metav1.ObjectMeta{Name: slice.Name, Namespace: slice.Namespace}, e.TargetRef, svc.Hostname) if pod == nil && expectedPod { continue } if pod != nil { podLabels = pod.Labels } // check that one of the input labels is a subset of the labels if !lbls.SubsetOf(podLabels) { continue } builder := NewEndpointBuilder(esc.c, pod) // identify the port by name. K8S EndpointPort uses the service port name for _, port := range slice.Ports() { var portNum int32 if port.Port != nil { portNum = *port.Port } if port.Name == nil || svcPort.Name == *port.Name { istioEndpoint := builder.buildIstioEndpoint(a, portNum, svcPort.Name, discoverabilityPolicy) out = append(out, &model.ServiceInstance{ Endpoint: istioEndpoint, ServicePort: svcPort, Service: svc, }) } } } } } return out } // TODO this isn't used now, but we may still want to extract locality from the v1 EnspointSlice instead of node func getLocalityFromTopology(topology map[string]string) string { locality := topology[NodeRegionLabelGA] if _, f := topology[NodeZoneLabelGA]; f { locality += "/" + topology[NodeZoneLabelGA] } if _, f := topology[label.TopologySubzone.Name]; f { locality += "/" + topology[label.TopologySubzone.Name] } return locality } // endpointKey unique identifies an endpoint by IP and port name // This is used for deduping endpoints across slices. type endpointKey struct { ip string port string } type endpointSliceCache struct { mu sync.RWMutex endpointsByServiceAndSlice map[host.Name]map[string][]*model.IstioEndpoint } func newEndpointSliceCache() *endpointSliceCache { out := &endpointSliceCache{ endpointsByServiceAndSlice: make(map[host.Name]map[string][]*model.IstioEndpoint), } return out } func (e *endpointSliceCache) Update(hostname host.Name, slice string, endpoints []*model.IstioEndpoint) { e.mu.Lock() defer e.mu.Unlock() if len(endpoints) == 0 { delete(e.endpointsByServiceAndSlice[hostname], slice) } if _, f := e.endpointsByServiceAndSlice[hostname]; !f { e.endpointsByServiceAndSlice[hostname] = make(map[string][]*model.IstioEndpoint) } // We will always overwrite. A conflict here means an endpoint is transitioning // from one slice to another See // https://github.com/kubernetes/website/blob/master/content/en/docs/concepts/services-networking/endpoint-slices.md#duplicate-endpoints // In this case, we can always assume and update is fresh, although older slices // we have not gotten updates may be stale; therefor we always take the new // update. e.endpointsByServiceAndSlice[hostname][slice] = endpoints } func (e *endpointSliceCache) Delete(hostname host.Name, slice string) { e.mu.Lock() defer e.mu.Unlock() delete(e.endpointsByServiceAndSlice[hostname], slice) if len(e.endpointsByServiceAndSlice[hostname]) == 0 { delete(e.endpointsByServiceAndSlice, hostname) } } func (e *endpointSliceCache) Get(hostname host.Name) []*model.IstioEndpoint { e.mu.RLock() defer e.mu.RUnlock() var endpoints []*model.IstioEndpoint found := map[endpointKey]struct{}{} for _, eps := range e.endpointsByServiceAndSlice[hostname] { for _, ep := range eps { key := endpointKey{ep.Address, ep.ServicePortName} if _, f := found[key]; f { // This a duplicate. Update() already handles conflict resolution, so we don't // need to pick the "right" one here. continue } found[key] = struct{}{} endpoints = append(endpoints, ep) } } return endpoints } func (e *endpointSliceCache) Has(hostname host.Name) bool { e.mu.RLock() defer e.mu.RUnlock() _, found := e.endpointsByServiceAndSlice[hostname] return found } func endpointSliceSelectorForService(name string) klabels.Selector { return klabels.Set(map[string]string{ v1beta1.LabelServiceName: name, }).AsSelectorPreValidated().Add(*endpointSliceRequirement) } func wrapEndpointSlice(slice interface{}) *endpointSliceWrapper { switch es := slice.(type) { case *v1.EndpointSlice: return &endpointSliceWrapper{ObjectMeta: es.ObjectMeta, v1: es} case *v1beta1.EndpointSlice: return &endpointSliceWrapper{ObjectMeta: es.ObjectMeta, v1beta1: es} } return nil } type endpointSliceWrapper struct { metav1.ObjectMeta v1beta1 *v1beta1.EndpointSlice v1 *v1.EndpointSlice } func (esw *endpointSliceWrapper) AddressType() v1.AddressType { if esw.v1 != nil { return esw.v1.AddressType } return v1.AddressType(esw.v1beta1.AddressType) } func (esw *endpointSliceWrapper) Ports() []v1.EndpointPort { if esw.v1 != nil { return esw.v1.Ports } out := make([]v1.EndpointPort, len(esw.v1beta1.Ports)) for i, p := range esw.v1beta1.Ports { out[i] = v1.EndpointPort{ Name: p.Name, Protocol: p.Protocol, Port: p.Port, AppProtocol: p.AppProtocol, } } return out } func (esw *endpointSliceWrapper) Endpoints() []v1.Endpoint { if esw.v1 != nil { return esw.v1.Endpoints } out := make([]v1.Endpoint, len(esw.v1beta1.Endpoints)) for i, ep := range esw.v1beta1.Endpoints { zone := ep.Topology[NodeZoneLabelGA] var fz []v1.ForZone if ep.Hints != nil { fz = make([]v1.ForZone, len(ep.Hints.ForZones)) for i, el := range fz { fz[i] = v1.ForZone{Name: el.Name} } } out[i] = v1.Endpoint{ Addresses: ep.Addresses, Conditions: v1.EndpointConditions{ Ready: ep.Conditions.Ready, Serving: ep.Conditions.Serving, Terminating: ep.Conditions.Serving, }, Hostname: ep.Hostname, TargetRef: ep.TargetRef, DeprecatedTopology: ep.Topology, NodeName: ep.NodeName, Zone: &zone, Hints: &v1.EndpointHints{ ForZones: fz, }, } } return out }