pkg/kube/endpoint.go (183 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one or more // contributor license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright ownership. // The ASF licenses this file to You under the Apache License, Version 2.0 // (the "License"); you may not use this file except in compliance with // the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package kube import ( "errors" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" discoveryv1 "k8s.io/api/discovery/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/informers" listerscorev1 "k8s.io/client-go/listers/core/v1" listersdiscoveryv1 "k8s.io/client-go/listers/discovery/v1" "k8s.io/client-go/tools/cache" "github.com/apache/apisix-ingress-controller/pkg/log" ) type HostPort struct { Host string Port int } // EndpointLister is an encapsulation for the lister of Kubernetes // Endpoint and EndpointSlice. type EndpointLister interface { // GetEndpoint fetches an Endpoint which entity is the Kubernetes // Endpoint according to the namespace and name. // Automatically choose from endpoint/endpointslice GetEndpoint(string, string) (Endpoint, error) } type endpointLister struct { useEndpointSlice bool epLister listerscorev1.EndpointsLister epsLister listersdiscoveryv1.EndpointSliceLister } func (lister *endpointLister) GetEndpoint(namespace, name string) (Endpoint, error) { if lister.useEndpointSlice { return lister.getEndpointSlices(namespace, name) } return lister.getEndpoint(namespace, name) } func (lister *endpointLister) getEndpoint(namespace, name string) (Endpoint, error) { if lister.epLister == nil { panic("not a endpoint lister") } ep, err := lister.epLister.Endpoints(namespace).Get(name) if err != nil { return nil, err } if ep == nil { log.Warnw("get endpoints but found nil", zap.String("namespace", namespace), zap.String("name", name), ) } return &endpoint{ endpointType: endpointTypeEndpoints, endpoint: ep, }, nil } func (lister *endpointLister) getEndpointSlices(namespace, svcName string) (Endpoint, error) { if lister.epsLister == nil { panic("not a endpointSlice lister") } selector := labels.SelectorFromSet(labels.Set{ discoveryv1.LabelServiceName: svcName, }) eps, err := lister.epsLister.EndpointSlices(namespace).List(selector) if err != nil { return nil, err } if len(eps) == 0 { log.Warnw("get endpoint slices but found empty slice", zap.String("namespace", namespace), zap.String("service", svcName), zap.Any("selector", selector), ) } return &endpoint{ endpointType: endpointTypeEndpointSlices, endpointSlices: eps, }, nil } // Endpoint is an encapsulation for the Kubernetes Endpoint and EndpointSlice objects. type Endpoint interface { // ServiceName returns the corresponding service owner of this endpoint. ServiceName() string // Namespace returns the residing namespace. Namespace() (string, error) // Endpoints returns the corresponding endpoints which matches the ServicePort. Endpoints(port *corev1.ServicePort) []HostPort } type endpointType string const ( endpointTypeEndpoints endpointType = "Endpoint" endpointTypeEndpointSlices endpointType = "EndpointSlices" ) type endpoint struct { endpointType endpointType endpoint *corev1.Endpoints endpointSlices []*discoveryv1.EndpointSlice } func (e *endpoint) ServiceName() string { if e.endpoint != nil { return e.endpoint.Name } return e.endpointSlices[0].Labels[discoveryv1.LabelServiceName] } func (e *endpoint) Namespace() (string, error) { switch e.endpointType { case endpointTypeEndpointSlices: if len(e.endpointSlices) > 0 { return e.endpointSlices[0].Namespace, nil } else { return "", errors.New("endpoint slice is empty") } case endpointTypeEndpoints: if e.endpoint != nil { return e.endpoint.Namespace, nil } else { return "", errors.New("endpoint is nil") } } return "", errors.New("unknown endpoint type " + string(e.endpointType)) } func (e *endpoint) Endpoints(svcPort *corev1.ServicePort) []HostPort { var addrs []HostPort if e.endpoint != nil { for _, subset := range e.endpoint.Subsets { epPort := -1 for _, subsetPort := range subset.Ports { if subsetPort.Name == svcPort.Name { epPort = int(subsetPort.Port) break } } if epPort != -1 { for _, addr := range subset.Addresses { addrs = append(addrs, HostPort{ Host: addr.IP, Port: epPort, }) } } } } else { for _, slice := range e.endpointSlices { epPort := -1 for _, slicePort := range slice.Ports { // TODO Consider the case that port not restricted. if slicePort.Name != nil && *slicePort.Name == svcPort.Name && slicePort.Port != nil { epPort = int(*slicePort.Port) break } } if epPort != -1 { for _, ep := range slice.Endpoints { if ep.Conditions.Ready != nil && !*ep.Conditions.Ready { // Ignore not ready endpoints. continue } for _, addr := range ep.Addresses { addrs = append(addrs, HostPort{ Host: addr, Port: epPort, }) } } } } } return addrs } // NewEndpointListerAndInformer creates an EndpointLister and the sharedIndexInformer. func NewEndpointListerAndInformer(factory informers.SharedInformerFactory, useEndpointSlice bool) (EndpointLister, cache.SharedIndexInformer) { var informer cache.SharedIndexInformer epLister := endpointLister{ useEndpointSlice: useEndpointSlice, } if !useEndpointSlice { epLister.epLister = factory.Core().V1().Endpoints().Lister() informer = factory.Core().V1().Endpoints().Informer() } else { epLister.epsLister = factory.Discovery().V1().EndpointSlices().Lister() informer = factory.Discovery().V1().EndpointSlices().Informer() } return &epLister, informer } // NewEndpoint creates an Endpoint which entity is Kubernetes Endpoints. func NewEndpoint(ep *corev1.Endpoints) Endpoint { return &endpoint{ endpointType: endpointTypeEndpoints, endpoint: ep, } } // NewEndpointWithSlice creates an Endpoint which entity is Kubernetes EndpointSlices. func NewEndpointWithSlice(ep *discoveryv1.EndpointSlice) Endpoint { return &endpoint{ endpointType: endpointTypeEndpointSlices, endpointSlices: []*discoveryv1.EndpointSlice{ep}, } }