pkg/controller/ingress/concurrency_watchdog.go (297 lines of code) (raw):

// Copyright (c) Microsoft Corporation. // Licensed under the MIT License. package ingress import ( "bytes" "container/ring" "context" "errors" "fmt" "io" "time" approutingv1alpha1 "github.com/Azure/aks-app-routing-operator/api/v1alpha1" "github.com/Azure/aks-app-routing-operator/pkg/controller/nginxingress" "github.com/go-logr/logr" "github.com/hashicorp/go-multierror" prommodel "github.com/prometheus/client_model/go" "github.com/prometheus/common/expfmt" corev1 "k8s.io/api/core/v1" policyv1beta1 "k8s.io/api/policy/v1beta1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "github.com/Azure/aks-app-routing-operator/pkg/config" "github.com/Azure/aks-app-routing-operator/pkg/controller/controllername" "github.com/Azure/aks-app-routing-operator/pkg/controller/metrics" "github.com/Azure/aks-app-routing-operator/pkg/util" ) var concurrencyWatchdogControllerName = controllername.New("concurrency", "watchdog") // ScrapeFn returns the connection count for the given pod type ScrapeFn func(ctx context.Context, client rest.Interface, pod *corev1.Pod) (float64, error) // NginxScrapeFn is the scrape function for Nginx func NginxScrapeFn(ctx context.Context, client rest.Interface, pod *corev1.Pod) (float64, error) { lgr := logr.FromContextOrDiscard(ctx) lgr.Info("scraping pod", "pod", pod.Name) resp, err := client.Get(). AbsPath("/api/v1/namespaces", pod.Namespace, "pods", pod.Name+":10254", "proxy/metrics"). Timeout(time.Second * 30). MaxRetries(4). DoRaw(ctx) if err != nil { return 0, err } family := &prommodel.MetricFamily{} format, err := expfmt.NewOpenMetricsFormat(expfmt.OpenMetricsVersion_0_0_1) if err != nil { return 0, fmt.Errorf("creating open metrics format: %w", err) } dec := expfmt.NewDecoder(bytes.NewReader(resp), format) for { err = dec.Decode(family) if errors.Is(err, io.EOF) { break } if err != nil { return 0, err } if family.GetName() != "nginx_ingress_controller_nginx_process_connections" { continue } for _, metric := range family.Metric { if metric.Gauge == nil || !metricHasLabel(metric, "state", "active") { continue } return metric.Gauge.GetValue(), nil } } return 0, fmt.Errorf("active connections metric not found") } // WatchdogTarget refers to a target the concurrency watchdog should track type WatchdogTarget struct { ScrapeFn ScrapeFn PodLabels map[string]string } type ListWatchdogTargets func() ([]WatchdogTarget, error) func GetListNginxWatchdogTargets(cl client.Client, defaultNicControllerClass string) ListWatchdogTargets { return func() ([]WatchdogTarget, error) { nics := &approutingv1alpha1.NginxIngressControllerList{} if err := cl.List(nil, nics); err != nil { return nil, fmt.Errorf("listing NginxIngressController objects: %w", err) } var targets []WatchdogTarget for _, nic := range nics.Items { ingCfg := nginxingress.ToNginxIngressConfig(&nic, defaultNicControllerClass) if ingCfg == nil { continue } targets = append(targets, WatchdogTarget{ ScrapeFn: NginxScrapeFn, PodLabels: ingCfg.PodLabels(), }) } return targets, nil } } // ConcurrencyWatchdog evicts ingress controller pods that have too many active connections relative to others. // This helps redistribute long-running connections when the ingress controller scales up. type ConcurrencyWatchdog struct { client client.Client clientset kubernetes.Interface restClient rest.Interface logger logr.Logger config *config.Config listWatchdogTargets ListWatchdogTargets interval, minPodAge, voteTTL time.Duration minVotesBeforeEviction int minPercentOverAvgBeforeVote float64 votes *ring.Ring } func NewConcurrencyWatchdog(manager ctrl.Manager, conf *config.Config, target ListWatchdogTargets) error { metrics.InitControllerMetrics(concurrencyWatchdogControllerName) clientset, err := kubernetes.NewForConfig(manager.GetConfig()) if err != nil { return err } c := &ConcurrencyWatchdog{ client: manager.GetClient(), clientset: clientset, restClient: clientset.CoreV1().RESTClient(), logger: concurrencyWatchdogControllerName.AddToLogger(manager.GetLogger()), config: conf, listWatchdogTargets: target, interval: time.Minute, minPodAge: time.Minute * 5, minVotesBeforeEviction: conf.ConcurrencyWatchdogVotes, minPercentOverAvgBeforeVote: conf.ConcurrencyWatchdogThres, voteTTL: time.Minute * 10, votes: ring.New(20), } return manager.Add(c) } func (c *ConcurrencyWatchdog) Start(ctx context.Context) error { for { select { case <-ctx.Done(): return ctx.Err() case <-time.After(util.Jitter(c.interval, 0.3)): } if err := c.tick(ctx); err != nil { c.logger.Error(err, "error reconciling ingress controller resources") continue } } } func (c *ConcurrencyWatchdog) tick(ctx context.Context) error { lgr := c.logger start := time.Now() var retErr *multierror.Error defer func() { lgr.Info("finished checking on ingress controller pods", "latencySec", time.Since(start).Seconds()) // placing this call inside a closure allows for result and err to be bound after tick executes // this makes sure they have the proper value // just calling defer metrics.HandleControllerReconcileMetrics(controllerName, result, err) would bind // the values of result and err to their zero values, since they were just instantiated metrics.HandleControllerReconcileMetrics(concurrencyWatchdogControllerName, ctrl.Result{}, retErr.ErrorOrNil()) }() lgr.Info("listing watchdog targets") targets, err := c.listWatchdogTargets() if err != nil { lgr.Error(err, "listing watchdog targets") retErr = multierror.Append(retErr, fmt.Errorf("listing watchdog targets: %w", err)) return retErr.ErrorOrNil() } for _, target := range targets { lgr := c.logger.WithValues("target", target.PodLabels) lgr.Info("starting checking on ingress controller pods") lgr.Info("listing pods") list := &corev1.PodList{} err := c.client.List(ctx, list, client.InNamespace(c.config.NS), client.MatchingLabels(target.PodLabels)) if err != nil { lgr.Error(err, "listing pods") retErr = multierror.Append(retErr, fmt.Errorf("listing pods: %w", err)) continue } lgr.Info("gathering metrics for each pod") connectionCountByPod := make([]float64, len(list.Items)) nReadyPods := 0 var totalConnectionCount float64 for i, pod := range list.Items { lgr := lgr.WithValues("pod", pod.Name, "namespace", pod.Namespace) if !podIsReady(&pod) { lgr.Info("pod is not ready", "name", pod.Name) continue } nReadyPods++ ctx := logr.NewContext(ctx, lgr) count, err := target.ScrapeFn(ctx, c.restClient, &pod) if err != nil { // check if pod is still ready. Pod might have become unready after checking it (this solves a race condition). // we ignore an error on the podIsActive call, we want the retErr to be the error from scrapping not from checking if // the pod is active. if active, err := podIsActive(ctx, lgr, c.client, client.ObjectKeyFromObject(&pod)); err == nil && !active { lgr.Info("pod isn't active anymore") continue } lgr.Error(err, "scraping pod") retErr = multierror.Append(retErr, fmt.Errorf("scraping pod %q: %w", pod.Name, err)) continue } connectionCountByPod[i] = count totalConnectionCount += count } avgConnectionCount := totalConnectionCount / float64(nReadyPods) // Only rebalance connections when three or more replicas are ready. // Otherwise we will just push the connections to the other replica. if nReadyPods < 3 { lgr.Info("not enough ready pods to rebalance connections", "readyPods", nReadyPods) continue } lgr.Info("processing votes") pod := c.processVotes(list, connectionCountByPod, avgConnectionCount) if pod == "" { lgr.Info("no pod to evict") continue } lgr.Info("evicting pod due to high relative connection concurrency", "name", pod) eviction := &policyv1beta1.Eviction{ ObjectMeta: metav1.ObjectMeta{ Name: pod, Namespace: c.config.NS, }, } if err := c.clientset.CoreV1().Pods(eviction.Namespace).EvictV1beta1(ctx, eviction); err != nil { lgr.Error(err, "unable to evict pod", "name", pod) // don't add the error to return since we shouldn't retry right away } } if err := retErr.ErrorOrNil(); err != nil { c.logger.Error(err, "reconciling ingress controller resources") return err } return nil } func (c *ConcurrencyWatchdog) processVotes(list *corev1.PodList, connectionCountByPod []float64, avgConnectionCount float64) string { // Vote on outlier(s) podsByName := map[string]struct{}{} for i, pod := range list.Items { podsByName[pod.Name] = struct{}{} rank := (connectionCountByPod[i] / avgConnectionCount) * 100 if rank < c.minPercentOverAvgBeforeVote || time.Since(pod.CreationTimestamp.Time) < c.minPodAge { continue } c.logger.Info("voting to evict pod due to high connection concurrency", "name", pod.Name, "percentOfAvg", rank) c.votes = c.votes.Next() var vote *evictionVote if c.votes.Value == nil { vote = &evictionVote{} c.votes.Value = vote } else { vote = c.votes.Value.(*evictionVote) } vote.PodName = pod.Name vote.Time = time.Now() } // Aggregate votes votesPerPod := map[string]int{} c.votes.Do(func(cur interface{}) { vote, ok := cur.(*evictionVote) if !ok { return } if _, exists := podsByName[vote.PodName]; !exists || time.Since(vote.Time) > c.voteTTL { return } votesPerPod[vote.PodName]++ }) // Apply votes for pod, votes := range votesPerPod { if votes < c.minVotesBeforeEviction { continue } return pod } return "" } func (c *ConcurrencyWatchdog) NeedLeaderElection() bool { return true } type evictionVote struct { Time time.Time PodName string } func metricHasLabel(metric *prommodel.Metric, key, value string) bool { for _, cur := range metric.Label { if cur.GetName() == key && cur.GetValue() == value { return true } } return false } func podIsReady(pod *corev1.Pod) bool { for _, cond := range pod.Status.Conditions { if cond.Type == corev1.PodReady && cond.Status == corev1.ConditionTrue { return true } } return false } // podIsActive checks if a Pod is Ready and exists and is able to serve connections func podIsActive(ctx context.Context, lgr logr.Logger, cl client.Client, pod client.ObjectKey) (bool, error) { var obj corev1.Pod err := cl.Get(ctx, pod, &obj) if apierrors.IsNotFound(err) { lgr.Info("pod doesn't exist anymore") return false, nil } if err != nil { lgr.Error(err, "failed to get pod") return false, fmt.Errorf("getting pod: %w", err) } if !podIsReady(&obj) { lgr.Info("pod isn't ready anymore") return false, nil } return true, nil }