in clusterloader2/pkg/measurement/common/wait_for_controlled_pods.go [500:581]
func (w *waitForControlledPodsRunningMeasurement) waitForRuntimeObject(obj runtime.Object, isDeleted bool) (*objectChecker, error) {
runtimeObjectNamespace, err := runtimeobjects.GetNamespaceFromRuntimeObject(obj)
if err != nil {
return nil, err
}
runtimeObjectSelector, err := runtimeobjects.GetSelectorFromRuntimeObject(obj)
if err != nil {
return nil, err
}
runtimeObjectReplicas, err := runtimeobjects.GetReplicasFromRuntimeObject(w.clusterFramework.GetClientSets().GetClient(), obj)
if err != nil {
return nil, err
}
var isPodUpdated func(*v1.Pod) error
if w.checkIfPodsAreUpdated {
isPodUpdated, err = runtimeobjects.GetIsPodUpdatedPredicateFromRuntimeObject(obj)
if err != nil {
return nil, err
}
}
if isDeleted {
runtimeObjectReplicas = &runtimeobjects.ConstReplicas{0}
}
key, err := runtimeobjects.CreateMetaNamespaceKey(obj)
if err != nil {
return nil, fmt.Errorf("meta key creation error: %v", err)
}
o := newObjectChecker(key)
o.lock.Lock()
defer o.lock.Unlock()
w.handlingGroup.Start(func() {
// We cannot use o.stopCh for runtimeObjectReplicas.Start as it's not clear if it's closed on happy path (no errors, no timeout).
// TODO(mborsz): Migrate to o.stopCh.
stopCh := make(chan struct{})
defer close(stopCh)
if err := runtimeObjectReplicas.Start(stopCh); err != nil {
klog.Errorf("%s: error while starting runtimeObjectReplicas: %v", key, err)
o.err = fmt.Errorf("failed to start runtimeObjectReplicas: %v", err)
return
}
options := &measurementutil.WaitForPodOptions{
Selector: &measurementutil.ObjectSelector{
Namespace: runtimeObjectNamespace,
LabelSelector: runtimeObjectSelector.String(),
FieldSelector: "",
},
DesiredPodCount: runtimeObjectReplicas.Replicas,
CountErrorMargin: w.countErrorMargin,
CallerName: w.String(),
WaitForPodsInterval: defaultWaitForPodsInterval,
IsPodUpdated: isPodUpdated,
}
// This function sets the status (and error message) for the object checker.
// The handling of bad statuses and errors is done by gather() function of the measurement.
err = measurementutil.WaitForPods(w.clusterFramework.GetClientSets().GetClient(), o.stopCh, options)
o.lock.Lock()
defer o.lock.Unlock()
if err != nil {
if o.isRunning {
// Log error only if checker wasn't terminated.
klog.Errorf("%s: error for %v: %v", w, key, err)
o.err = fmt.Errorf("%s: %v", key, err)
}
if o.status == timeout {
if isDeleted {
o.status = deleteTimeout
}
klog.Errorf("%s: %s timed out", w, key)
}
return
}
o.isRunning = false
if isDeleted {
o.status = deleted
return
}
o.status = running
})
return o, nil
}