func()

in pkg/controller/ingress/concurrency_watchdog.go [174:273]


func (c *ConcurrencyWatchdog) tick(ctx context.Context) error {
	lgr := c.logger
	start := time.Now()
	var retErr *multierror.Error
	defer func() {
		lgr.Info("finished checking on ingress controller pods", "latencySec", time.Since(start).Seconds())

		// placing this call inside a closure allows for result and err to be bound after tick executes
		// this makes sure they have the proper value
		// just calling defer metrics.HandleControllerReconcileMetrics(controllerName, result, err) would bind
		// the values of result and err to their zero values, since they were just instantiated
		metrics.HandleControllerReconcileMetrics(concurrencyWatchdogControllerName, ctrl.Result{}, retErr.ErrorOrNil())
	}()

	lgr.Info("listing watchdog targets")
	targets, err := c.listWatchdogTargets()
	if err != nil {
		lgr.Error(err, "listing watchdog targets")
		retErr = multierror.Append(retErr, fmt.Errorf("listing watchdog targets: %w", err))
		return retErr.ErrorOrNil()
	}

	for _, target := range targets {
		lgr := c.logger.WithValues("target", target.PodLabels)
		lgr.Info("starting checking on ingress controller pods")

		lgr.Info("listing pods")
		list := &corev1.PodList{}
		err := c.client.List(ctx, list, client.InNamespace(c.config.NS), client.MatchingLabels(target.PodLabels))
		if err != nil {
			lgr.Error(err, "listing pods")
			retErr = multierror.Append(retErr, fmt.Errorf("listing pods: %w", err))
			continue
		}

		lgr.Info("gathering metrics for each pod")
		connectionCountByPod := make([]float64, len(list.Items))
		nReadyPods := 0
		var totalConnectionCount float64
		for i, pod := range list.Items {
			lgr := lgr.WithValues("pod", pod.Name, "namespace", pod.Namespace)
			if !podIsReady(&pod) {
				lgr.Info("pod is not ready", "name", pod.Name)
				continue
			}
			nReadyPods++
			ctx := logr.NewContext(ctx, lgr)
			count, err := target.ScrapeFn(ctx, c.restClient, &pod)
			if err != nil {

				// check if pod is still ready. Pod might have become unready after checking it (this solves a race condition).
				// we ignore an error on the podIsActive call, we want the retErr to be the error from scrapping not from checking if
				// the pod is active.
				if active, err := podIsActive(ctx, lgr, c.client, client.ObjectKeyFromObject(&pod)); err == nil && !active {
					lgr.Info("pod isn't active anymore")
					continue
				}

				lgr.Error(err, "scraping pod")
				retErr = multierror.Append(retErr, fmt.Errorf("scraping pod %q: %w", pod.Name, err))
				continue
			}
			connectionCountByPod[i] = count
			totalConnectionCount += count
		}
		avgConnectionCount := totalConnectionCount / float64(nReadyPods)

		// Only rebalance connections when three or more replicas are ready.
		// Otherwise we will just push the connections to the other replica.
		if nReadyPods < 3 {
			lgr.Info("not enough ready pods to rebalance connections", "readyPods", nReadyPods)
			continue
		}

		lgr.Info("processing votes")
		pod := c.processVotes(list, connectionCountByPod, avgConnectionCount)
		if pod == "" {
			lgr.Info("no pod to evict")
			continue
		}

		lgr.Info("evicting pod due to high relative connection concurrency", "name", pod)
		eviction := &policyv1beta1.Eviction{
			ObjectMeta: metav1.ObjectMeta{
				Name:      pod,
				Namespace: c.config.NS,
			},
		}

		if err := c.clientset.CoreV1().Pods(eviction.Namespace).EvictV1beta1(ctx, eviction); err != nil {
			lgr.Error(err, "unable to evict pod", "name", pod)
			// don't add the error to return since we shouldn't retry right away
		}
	}
	if err := retErr.ErrorOrNil(); err != nil {
		c.logger.Error(err, "reconciling ingress controller resources")
		return err
	}
	return nil
}