func WaitForPods()

in clusterloader2/pkg/measurement/util/wait_for_pods.go [52:115]


func WaitForPods(clientSet clientset.Interface, stopCh <-chan struct{}, options *WaitForPodOptions) error {
	ps, err := NewPodStore(clientSet, options.Selector)
	if err != nil {
		return fmt.Errorf("pod store creation error: %v", err)
	}
	defer ps.Stop()

	oldPods := ps.List()
	scaling := uninitialized
	var podsStatus PodsStartupStatus
	var lastIsPodUpdatedError error

	for {
		select {
		case <-stopCh:
			desiredPodCount := options.DesiredPodCount()
			pods := ComputePodsStatus(oldPods)
			klog.V(2).Infof("%s: %s: expected %d pods, got %d pods (not RunningAndReady pods: %v)", options.CallerName, options.Selector.String(), desiredPodCount, len(oldPods), pods.NotRunningAndReady())
			klog.V(2).Infof("%s: %s: all pods: %v", options.CallerName, options.Selector.String(), pods)
			klog.V(2).Infof("%s: %s: last IsPodUpdated error: %v", options.CallerName, options.Selector.String(), lastIsPodUpdatedError)
			return fmt.Errorf("timeout while waiting for %d pods to be running in namespace '%v' with labels '%v' and fields '%v' - summary of pods : %s",
				desiredPodCount, options.Selector.Namespace, options.Selector.LabelSelector, options.Selector.FieldSelector, podsStatus.String())
		case <-time.After(options.WaitForPodsInterval):
			desiredPodCount := options.DesiredPodCount()

			switch {
			case len(oldPods) == desiredPodCount:
				scaling = none
			case len(oldPods) < desiredPodCount:
				scaling = up
			case len(oldPods) > desiredPodCount:
				scaling = down
			}

			pods := ps.List()
			podsStatus = ComputePodsStartupStatus(pods, desiredPodCount, options.IsPodUpdated)
			if podsStatus.LastIsPodUpdatedError != nil {
				lastIsPodUpdatedError = podsStatus.LastIsPodUpdatedError
			}

			diff := DiffPods(oldPods, pods)
			deletedPods := diff.DeletedPods()
			if scaling != down && len(deletedPods) > 0 {
				klog.Errorf("%s: %s: %d pods disappeared: %v", options.CallerName, options.Selector.String(), len(deletedPods), strings.Join(deletedPods, ", "))
			}
			addedPods := diff.AddedPods()
			if scaling != up && len(addedPods) > 0 {
				klog.Errorf("%s: %s: %d pods appeared: %v", options.CallerName, options.Selector.String(), len(addedPods), strings.Join(addedPods, ", "))
			}
			klog.V(2).Infof("%s: %s: %s", options.CallerName, options.Selector.String(), podsStatus.String())
			// We allow inactive pods (e.g. eviction happened).
			// We wait until there is a desired number of pods running and all other pods are inactive.
			if len(pods) == (podsStatus.Running+podsStatus.Inactive) && podsStatus.Running == podsStatus.RunningUpdated && podsStatus.RunningUpdated == desiredPodCount {
				return nil
			}
			// When using preemptibles on large scale, number of ready nodes is not stable and reaching DesiredPodCount could take a very long time.
			// Overall number of pods (especially Inactive pods) should not grow unchecked.
			if options.CountErrorMargin > 0 && podsStatus.RunningUpdated >= desiredPodCount-options.CountErrorMargin && len(pods)-podsStatus.Inactive <= desiredPodCount && podsStatus.Inactive <= options.CountErrorMargin {
				return nil
			}
			oldPods = pods
		}
	}
}