in pkg/controller/ingress/concurrency_watchdog.go [174:273]
func (c *ConcurrencyWatchdog) tick(ctx context.Context) error {
lgr := c.logger
start := time.Now()
var retErr *multierror.Error
defer func() {
lgr.Info("finished checking on ingress controller pods", "latencySec", time.Since(start).Seconds())
// placing this call inside a closure allows for result and err to be bound after tick executes
// this makes sure they have the proper value
// just calling defer metrics.HandleControllerReconcileMetrics(controllerName, result, err) would bind
// the values of result and err to their zero values, since they were just instantiated
metrics.HandleControllerReconcileMetrics(concurrencyWatchdogControllerName, ctrl.Result{}, retErr.ErrorOrNil())
}()
lgr.Info("listing watchdog targets")
targets, err := c.listWatchdogTargets()
if err != nil {
lgr.Error(err, "listing watchdog targets")
retErr = multierror.Append(retErr, fmt.Errorf("listing watchdog targets: %w", err))
return retErr.ErrorOrNil()
}
for _, target := range targets {
lgr := c.logger.WithValues("target", target.PodLabels)
lgr.Info("starting checking on ingress controller pods")
lgr.Info("listing pods")
list := &corev1.PodList{}
err := c.client.List(ctx, list, client.InNamespace(c.config.NS), client.MatchingLabels(target.PodLabels))
if err != nil {
lgr.Error(err, "listing pods")
retErr = multierror.Append(retErr, fmt.Errorf("listing pods: %w", err))
continue
}
lgr.Info("gathering metrics for each pod")
connectionCountByPod := make([]float64, len(list.Items))
nReadyPods := 0
var totalConnectionCount float64
for i, pod := range list.Items {
lgr := lgr.WithValues("pod", pod.Name, "namespace", pod.Namespace)
if !podIsReady(&pod) {
lgr.Info("pod is not ready", "name", pod.Name)
continue
}
nReadyPods++
ctx := logr.NewContext(ctx, lgr)
count, err := target.ScrapeFn(ctx, c.restClient, &pod)
if err != nil {
// check if pod is still ready. Pod might have become unready after checking it (this solves a race condition).
// we ignore an error on the podIsActive call, we want the retErr to be the error from scrapping not from checking if
// the pod is active.
if active, err := podIsActive(ctx, lgr, c.client, client.ObjectKeyFromObject(&pod)); err == nil && !active {
lgr.Info("pod isn't active anymore")
continue
}
lgr.Error(err, "scraping pod")
retErr = multierror.Append(retErr, fmt.Errorf("scraping pod %q: %w", pod.Name, err))
continue
}
connectionCountByPod[i] = count
totalConnectionCount += count
}
avgConnectionCount := totalConnectionCount / float64(nReadyPods)
// Only rebalance connections when three or more replicas are ready.
// Otherwise we will just push the connections to the other replica.
if nReadyPods < 3 {
lgr.Info("not enough ready pods to rebalance connections", "readyPods", nReadyPods)
continue
}
lgr.Info("processing votes")
pod := c.processVotes(list, connectionCountByPod, avgConnectionCount)
if pod == "" {
lgr.Info("no pod to evict")
continue
}
lgr.Info("evicting pod due to high relative connection concurrency", "name", pod)
eviction := &policyv1beta1.Eviction{
ObjectMeta: metav1.ObjectMeta{
Name: pod,
Namespace: c.config.NS,
},
}
if err := c.clientset.CoreV1().Pods(eviction.Namespace).EvictV1beta1(ctx, eviction); err != nil {
lgr.Error(err, "unable to evict pod", "name", pod)
// don't add the error to return since we shouldn't retry right away
}
}
if err := retErr.ErrorOrNil(); err != nil {
c.logger.Error(err, "reconciling ingress controller resources")
return err
}
return nil
}