func()

in clusterloader2/pkg/measurement/common/wait_for_controlled_pods.go [500:581]


func (w *waitForControlledPodsRunningMeasurement) waitForRuntimeObject(obj runtime.Object, isDeleted bool) (*objectChecker, error) {
	runtimeObjectNamespace, err := runtimeobjects.GetNamespaceFromRuntimeObject(obj)
	if err != nil {
		return nil, err
	}
	runtimeObjectSelector, err := runtimeobjects.GetSelectorFromRuntimeObject(obj)
	if err != nil {
		return nil, err
	}
	runtimeObjectReplicas, err := runtimeobjects.GetReplicasFromRuntimeObject(w.clusterFramework.GetClientSets().GetClient(), obj)
	if err != nil {
		return nil, err
	}
	var isPodUpdated func(*v1.Pod) error
	if w.checkIfPodsAreUpdated {
		isPodUpdated, err = runtimeobjects.GetIsPodUpdatedPredicateFromRuntimeObject(obj)
		if err != nil {
			return nil, err
		}
	}
	if isDeleted {
		runtimeObjectReplicas = &runtimeobjects.ConstReplicas{0}
	}
	key, err := runtimeobjects.CreateMetaNamespaceKey(obj)
	if err != nil {
		return nil, fmt.Errorf("meta key creation error: %v", err)
	}

	o := newObjectChecker(key)
	o.lock.Lock()
	defer o.lock.Unlock()
	w.handlingGroup.Start(func() {
		// We cannot use o.stopCh for runtimeObjectReplicas.Start as it's not clear if it's closed on happy path (no errors, no timeout).
		// TODO(mborsz): Migrate to o.stopCh.
		stopCh := make(chan struct{})
		defer close(stopCh)
		if err := runtimeObjectReplicas.Start(stopCh); err != nil {
			klog.Errorf("%s: error while starting runtimeObjectReplicas: %v", key, err)
			o.err = fmt.Errorf("failed to start runtimeObjectReplicas: %v", err)
			return
		}
		options := &measurementutil.WaitForPodOptions{
			Selector: &measurementutil.ObjectSelector{
				Namespace:     runtimeObjectNamespace,
				LabelSelector: runtimeObjectSelector.String(),
				FieldSelector: "",
			},
			DesiredPodCount:     runtimeObjectReplicas.Replicas,
			CountErrorMargin:    w.countErrorMargin,
			CallerName:          w.String(),
			WaitForPodsInterval: defaultWaitForPodsInterval,
			IsPodUpdated:        isPodUpdated,
		}
		// This function sets the status (and error message) for the object checker.
		// The handling of bad statuses and errors is done by gather() function of the measurement.
		err = measurementutil.WaitForPods(w.clusterFramework.GetClientSets().GetClient(), o.stopCh, options)
		o.lock.Lock()
		defer o.lock.Unlock()
		if err != nil {
			if o.isRunning {
				// Log error only if checker wasn't terminated.
				klog.Errorf("%s: error for %v: %v", w, key, err)
				o.err = fmt.Errorf("%s: %v", key, err)
			}
			if o.status == timeout {
				if isDeleted {
					o.status = deleteTimeout
				}
				klog.Errorf("%s: %s timed out", w, key)
			}
			return
		}
		o.isRunning = false
		if isDeleted {
			o.status = deleted
			return
		}

		o.status = running
	})
	return o, nil
}