in clusterloader2/pkg/measurement/util/wait_for_pods.go [52:115]
func WaitForPods(clientSet clientset.Interface, stopCh <-chan struct{}, options *WaitForPodOptions) error {
ps, err := NewPodStore(clientSet, options.Selector)
if err != nil {
return fmt.Errorf("pod store creation error: %v", err)
}
defer ps.Stop()
oldPods := ps.List()
scaling := uninitialized
var podsStatus PodsStartupStatus
var lastIsPodUpdatedError error
for {
select {
case <-stopCh:
desiredPodCount := options.DesiredPodCount()
pods := ComputePodsStatus(oldPods)
klog.V(2).Infof("%s: %s: expected %d pods, got %d pods (not RunningAndReady pods: %v)", options.CallerName, options.Selector.String(), desiredPodCount, len(oldPods), pods.NotRunningAndReady())
klog.V(2).Infof("%s: %s: all pods: %v", options.CallerName, options.Selector.String(), pods)
klog.V(2).Infof("%s: %s: last IsPodUpdated error: %v", options.CallerName, options.Selector.String(), lastIsPodUpdatedError)
return fmt.Errorf("timeout while waiting for %d pods to be running in namespace '%v' with labels '%v' and fields '%v' - summary of pods : %s",
desiredPodCount, options.Selector.Namespace, options.Selector.LabelSelector, options.Selector.FieldSelector, podsStatus.String())
case <-time.After(options.WaitForPodsInterval):
desiredPodCount := options.DesiredPodCount()
switch {
case len(oldPods) == desiredPodCount:
scaling = none
case len(oldPods) < desiredPodCount:
scaling = up
case len(oldPods) > desiredPodCount:
scaling = down
}
pods := ps.List()
podsStatus = ComputePodsStartupStatus(pods, desiredPodCount, options.IsPodUpdated)
if podsStatus.LastIsPodUpdatedError != nil {
lastIsPodUpdatedError = podsStatus.LastIsPodUpdatedError
}
diff := DiffPods(oldPods, pods)
deletedPods := diff.DeletedPods()
if scaling != down && len(deletedPods) > 0 {
klog.Errorf("%s: %s: %d pods disappeared: %v", options.CallerName, options.Selector.String(), len(deletedPods), strings.Join(deletedPods, ", "))
}
addedPods := diff.AddedPods()
if scaling != up && len(addedPods) > 0 {
klog.Errorf("%s: %s: %d pods appeared: %v", options.CallerName, options.Selector.String(), len(addedPods), strings.Join(addedPods, ", "))
}
klog.V(2).Infof("%s: %s: %s", options.CallerName, options.Selector.String(), podsStatus.String())
// We allow inactive pods (e.g. eviction happened).
// We wait until there is a desired number of pods running and all other pods are inactive.
if len(pods) == (podsStatus.Running+podsStatus.Inactive) && podsStatus.Running == podsStatus.RunningUpdated && podsStatus.RunningUpdated == desiredPodCount {
return nil
}
// When using preemptibles on large scale, number of ready nodes is not stable and reaching DesiredPodCount could take a very long time.
// Overall number of pods (especially Inactive pods) should not grow unchecked.
if options.CountErrorMargin > 0 && podsStatus.RunningUpdated >= desiredPodCount-options.CountErrorMargin && len(pods)-podsStatus.Inactive <= desiredPodCount && podsStatus.Inactive <= options.CountErrorMargin {
return nil
}
oldPods = pods
}
}
}