grpc-xds/control-plane-go/pkg/informers/manager.go (205 lines of code) (raw):
// Copyright 2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package informers
import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"
"time"
"github.com/go-logr/logr"
discoveryv1 "k8s.io/api/discovery/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
discoveryinformers "k8s.io/client-go/informers/discovery/v1"
"k8s.io/client-go/kubernetes"
informercache "k8s.io/client-go/tools/cache"
"github.com/googlecloudplatform/solutions-workshops/grpc-xds/control-plane-go/pkg/applications"
"github.com/googlecloudplatform/solutions-workshops/grpc-xds/control-plane-go/pkg/xds"
)
var (
errMissingLabel = errors.New("missing service label")
errMissingMetadata = errors.New("missing metadata")
errNilEndpointSlice = errors.New("nil EndpointSlice")
errNoPortsInEndpointSlice = errors.New("no ports in EndpointSlice")
errUnexpectedType = errors.New("unexpected type")
// Kubernetes Service ports with one of these names will be considered health check ports (case-sensitive match). This is a port naming convention invented in this sample xDS control plane implementation.
healthCheckPortNames = map[string]bool{
"health": true,
"healthz": true,
"healthCheck": true,
"healthcheck": true,
}
)
// Manager manages a collection of informers.
type Manager struct {
kubecontext string
clientset *kubernetes.Clientset
xdsCache *xds.SnapshotCache
}
// NewManager creates an instance that manages a collection of informers
// for one kubecontext.
func NewManager(ctx context.Context, kubecontextName string, xdsCache *xds.SnapshotCache) (*Manager, error) {
clientset, err := NewClientSet(ctx, kubecontextName)
if err != nil {
return nil, err
}
return &Manager{
kubecontext: kubecontextName,
clientset: clientset,
xdsCache: xdsCache,
}, nil
}
func (m *Manager) AddEndpointSliceInformer(ctx context.Context, logger logr.Logger, config Config) error {
logger = logger.WithValues("kubecontext", m.kubecontext, "namespace", config.Namespace)
if config.Services == nil {
config.Services = make([]string, 0)
}
labelSelector := fmt.Sprintf("%s in (%s)", discoveryv1.LabelServiceName, strings.Join(config.Services, ", "))
logger.V(2).Info("Creating informer for EndpointSlices", "labelSelector", labelSelector)
stop := make(chan struct{})
go func() {
<-ctx.Done()
logger.V(1).Info("Stopping informer for EndpointSlices", "labelSelector", labelSelector)
close(stop)
}()
factory := informers.NewSharedInformerFactory(m.clientset, 0)
informer := factory.InformerFor(&discoveryv1.EndpointSlice{}, func(clientSet kubernetes.Interface, resyncPeriod time.Duration) informercache.SharedIndexInformer {
indexers := informercache.Indexers{informercache.NamespaceIndex: informercache.MetaNamespaceIndexFunc}
return discoveryinformers.NewFilteredEndpointSliceInformer(clientSet, config.Namespace, resyncPeriod, indexers, func(listOptions *metav1.ListOptions) {
listOptions.LabelSelector = labelSelector
})
})
_, err := informer.AddEventHandler(informercache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
logger := logger.WithValues("event", "add")
logEndpointSlice(logger, obj)
apps := getAppsForInformer(logger, informer)
m.handleEndpointSliceEvent(ctx, logger, config.Namespace, apps)
},
UpdateFunc: func(_, obj interface{}) {
logger := logger.WithValues("event", "update")
logEndpointSlice(logger, obj)
apps := getAppsForInformer(logger, informer)
m.handleEndpointSliceEvent(ctx, logger, config.Namespace, apps)
},
DeleteFunc: func(obj interface{}) {
logger := logger.WithValues("event", "delete")
logEndpointSlice(logger, obj)
apps := getAppsForInformer(logger, informer)
m.handleEndpointSliceEvent(ctx, logger, config.Namespace, apps)
},
})
if err != nil {
return fmt.Errorf("could not add informer event handler for kubecontext=%s namespace=%s services=%+v: %w", m.kubecontext, config.Namespace, config.Services, err)
}
go func() {
logger.V(2).Info("Starting informer", "services", config.Services)
informer.Run(stop)
}()
return nil
}
func logEndpointSlice(logger logr.Logger, obj interface{}) {
if logger.V(4).Enabled() {
jsonBytes, err := json.MarshalIndent(obj, "", " ")
if err != nil {
logger.Error(err, "could not marshal EndpointSlice to JSON", "endpointSlice", obj)
}
logger.V(4).Info("Informer", "endpointSlice", string(jsonBytes))
}
}
func (m *Manager) handleEndpointSliceEvent(ctx context.Context, logger logr.Logger, namespace string, apps []applications.Application) {
logger.V(2).Info("Informer resource update", "apps", apps)
if err := m.xdsCache.UpdateResources(ctx, logger, m.kubecontext, namespace, apps); err != nil {
// Can't propagate this error, and we probably shouldn't end the goroutine anyway.
logger.Error(err, "Could not update the xDS resource cache with gRPC application configuration", "apps", apps)
}
}
func getAppsForInformer(logger logr.Logger, informer informercache.SharedIndexInformer) []applications.Application {
var apps []applications.Application
for _, eps := range informer.GetIndexer().List() {
endpointSlice, err := validateEndpointSlice(eps)
if err != nil {
logger.Error(err, "Skipping EndpointSlice")
continue
}
k8sServiceName := endpointSlice.GetObjectMeta().GetLabels()[discoveryv1.LabelServiceName]
namespace := endpointSlice.GetObjectMeta().GetNamespace()
servingPort := findServingPort(endpointSlice)
healthCheckPort, exists := findHealthCheckPort(endpointSlice)
if !exists {
// Default to using the serving port for health checks.
healthCheckPort = servingPort
}
servingProtocol := findProtocol(servingPort)
healthCheckProtocol := findProtocol(healthCheckPort)
appEndpoints := getApplicationEndpoints(endpointSlice)
app := applications.NewApplication(namespace, k8sServiceName, uint32(*servingPort.Port), servingProtocol, uint32(*healthCheckPort.Port), healthCheckProtocol, appEndpoints)
apps = append(apps, app)
}
return apps
}
// getProtocol returns the protocol of the provided port, in all lowercase, by considering the following:
//
// 1. The [appProtocol](https://kubernetes.io/docs/concepts/services-networking/service/#application-protocol), if set.
// 2. The [protocol](https://kubernetes.io/docs/reference/networking/service-protocols/#protocol-support), if set.
// 3. The default value of `tcp`.
func findProtocol(port discoveryv1.EndpointPort) string {
if port.AppProtocol != nil {
return strings.ToLower(*port.AppProtocol)
}
if port.Protocol != nil {
return strings.ToLower(string(*port.Protocol))
}
return "tcp"
}
// findServingPort returns the first port that isn't named to identify as a health check port.
// If there is only port on the EndpointSlice, return it regardless of name.
func findServingPort(endpointSlice *discoveryv1.EndpointSlice) discoveryv1.EndpointPort {
for _, endpointPort := range endpointSlice.Ports {
if endpointPort.Port != nil && (endpointPort.Name == nil || !healthCheckPortNames[*endpointPort.Name]) {
return endpointPort
}
}
// If all ports are named as health check ports, use the first one, regardless of name.
return endpointSlice.Ports[0]
}
// findHealthCheckPort returns the first port that is named to identify as a health check port.
// Returns `false` as the second return value if no ports are named to identify as health check ports.
func findHealthCheckPort(endpointSlice *discoveryv1.EndpointSlice) (discoveryv1.EndpointPort, bool) {
for _, endpointPort := range endpointSlice.Ports {
if endpointPort.Name != nil && healthCheckPortNames[*endpointPort.Name] {
return endpointPort, true
}
}
return discoveryv1.EndpointPort{}, false
}
// getApplicationEndpoints returns the endpoints as `GRPCApplicationEndpoints`.
func getApplicationEndpoints(endpointSlice *discoveryv1.EndpointSlice) []applications.ApplicationEndpoints {
var appEndpoints []applications.ApplicationEndpoints
for _, endpoint := range endpointSlice.Endpoints {
if endpoint.Conditions.Ready != nil && *endpoint.Conditions.Ready {
var k8sNode, zone string
if endpoint.NodeName != nil {
k8sNode = *endpoint.NodeName
}
if endpoint.Zone != nil {
zone = *endpoint.Zone
}
appEndpoints = append(appEndpoints, applications.NewApplicationEndpoints(k8sNode, zone, endpoint.Addresses, applications.EndpointStatusFromConditions(endpoint.Conditions)))
}
}
return appEndpoints
}
// validateEndpointSlice ensures that the EndpointSlice contains the fields
// required to turn it into a `xds.GRPCApplication` instance.
func validateEndpointSlice(eps interface{}) (*discoveryv1.EndpointSlice, error) {
if eps == nil {
return nil, errNilEndpointSlice
}
endpointSlice, ok := eps.(*discoveryv1.EndpointSlice)
if !ok {
return nil, fmt.Errorf("%w: expected *discoveryv1.EndpointSlice, got %T", errUnexpectedType, eps)
}
if endpointSlice.GetObjectMeta().GetName() == "" ||
endpointSlice.GetObjectMeta().GetNamespace() == "" {
return nil, fmt.Errorf("%w from EndpointSlice %+v", errMissingMetadata, endpointSlice)
}
if endpointSlice.GetObjectMeta().GetLabels() == nil ||
len(endpointSlice.GetObjectMeta().GetLabels()[discoveryv1.LabelServiceName]) == 0 {
return nil, fmt.Errorf("%w from EndpointSlice %+v", errMissingLabel, endpointSlice)
}
if len(endpointSlice.Ports) == 0 {
return nil, fmt.Errorf("%w: %+v", errNoPortsInEndpointSlice, endpointSlice)
}
nonNilPortNumberExists := false
for _, endpointPort := range endpointSlice.Ports {
if endpointPort.Port != nil {
nonNilPortNumberExists = true
}
}
if !nonNilPortNumberExists {
return nil, fmt.Errorf("%w: %+v", errNoPortsInEndpointSlice, endpointSlice)
}
return endpointSlice, nil
}