clusterloader2/pkg/measurement/util/runtimeobjects/runtimeobjects.go (345 lines of code) (raw):

/* Copyright 2018 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package runtimeobjects import ( "context" "fmt" "strconv" goerrors "github.com/go-errors/errors" gocmp "github.com/google/go-cmp/cmp" appsv1 "k8s.io/api/apps/v1" batch "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/dynamic" clientset "k8s.io/client-go/kubernetes" corev1helpers "k8s.io/component-helpers/scheduling/corev1" "k8s.io/perf-tests/clusterloader2/pkg/framework/client" ) // ListRuntimeObjectsForKind returns objects of given gvr that satisfy given labelSelector and fieldSelector. func ListRuntimeObjectsForKind(d dynamic.Interface, gvr schema.GroupVersionResource, labelSelector, fieldSelector string) ([]runtime.Object, error) { var runtimeObjectsList []runtime.Object var listFunc func() error listOpts := metav1.ListOptions{ LabelSelector: labelSelector, FieldSelector: fieldSelector, } listFunc = func() error { list, err := d.Resource(gvr).List(context.TODO(), listOpts) if err != nil { return err } runtimeObjectsList = make([]runtime.Object, len(list.Items)) for i := range list.Items { runtimeObjectsList[i] = &list.Items[i] } return nil } if err := client.RetryWithExponentialBackOff(client.RetryFunction(listFunc)); err != nil { return nil, err } return runtimeObjectsList, nil } // GetResourceVersionFromRuntimeObject returns resource version of given runtime object. func GetResourceVersionFromRuntimeObject(obj runtime.Object) (uint64, error) { accessor, err := meta.Accessor(obj) if err != nil { return 0, fmt.Errorf("accessor error: %v", err) } version := accessor.GetResourceVersion() if len(version) == 0 { return 0, nil } return strconv.ParseUint(version, 10, 64) } // GetIsPodUpdatedPredicateFromRuntimeObject returns a func(*corev1.Pod) bool predicate // that can be used to check if given pod represents the desired state of pod. func GetIsPodUpdatedPredicateFromRuntimeObject(obj runtime.Object) (func(*corev1.Pod) error, error) { switch typed := obj.(type) { case *unstructured.Unstructured: return getIsPodUpdatedPodPredicateFromUnstructured(typed) default: return nil, goerrors.Errorf("unsupported kind when getting updated pod predicate: %v", obj) } } // Auxiliary error type for lazy evaluation of gocmp.Diff which is known to be // computationally expensive. type lazySpecDiffError struct { templateSpec corev1.PodSpec podSpec corev1.PodSpec } func (lsde *lazySpecDiffError) Error() string { return fmt.Sprintf("Not matching templates, diff: %v", gocmp.Diff(lsde.templateSpec, lsde.podSpec)) } func getIsPodUpdatedPodPredicateFromUnstructured(obj *unstructured.Unstructured) (func(_ *corev1.Pod) error, error) { templateMap, ok, err := unstructured.NestedMap(obj.UnstructuredContent(), "spec", "template") if err != nil { return nil, goerrors.Errorf("failed to get pod template: %v", err) } if !ok { return nil, goerrors.Errorf("spec.template is not set in object %v", obj.UnstructuredContent()) } template := corev1.PodTemplateSpec{} if err := runtime.DefaultUnstructuredConverter.FromUnstructured(templateMap, &template); err != nil { return nil, goerrors.Errorf("failed to parse spec.teemplate as v1.PodTemplateSpec") } return func(pod *corev1.Pod) error { if !equality.Semantic.DeepDerivative(template.Spec, pod.Spec) { return &lazySpecDiffError{template.Spec, pod.Spec} } return nil }, nil } // GetSpecFromRuntimeObject returns spec of given runtime object. func GetSpecFromRuntimeObject(obj runtime.Object) (interface{}, error) { if obj == nil { return nil, nil } switch typed := obj.(type) { case *unstructured.Unstructured: return getSpecFromUnstrutured(typed) case *corev1.ReplicationController: return typed.Spec, nil case *appsv1.ReplicaSet: return typed.Spec, nil case *appsv1.Deployment: return typed.Spec, nil case *appsv1.StatefulSet: return typed.Spec, nil case *appsv1.DaemonSet: return typed.Spec, nil case *batch.Job: return typed.Spec, nil default: return nil, fmt.Errorf("unsupported kind when getting spec: %v", obj) } } // Note: This function assumes each controller has field Spec. func getSpecFromUnstrutured(obj *unstructured.Unstructured) (map[string]interface{}, error) { spec, ok, err := unstructured.NestedMap(obj.UnstructuredContent(), "spec") if err != nil { return nil, fmt.Errorf("try to acquire spec failed, %v", err) } if !ok { return nil, fmt.Errorf("try to acquire spec failed, no field spec for obj %s", obj.GetName()) } return spec, nil } // GetReplicasFromRuntimeObject returns replicas number from given runtime object. func GetReplicasFromRuntimeObject(c clientset.Interface, obj runtime.Object) (ReplicasWatcher, error) { if obj == nil { return &ConstReplicas{0}, nil } switch typed := obj.(type) { case *unstructured.Unstructured: return getReplicasFromUnstrutured(c, typed) case *corev1.ReplicationController: if typed.Spec.Replicas != nil { return &ConstReplicas{int(*typed.Spec.Replicas)}, nil } return &ConstReplicas{0}, nil case *appsv1.ReplicaSet: if typed.Spec.Replicas != nil { return &ConstReplicas{int(*typed.Spec.Replicas)}, nil } return &ConstReplicas{0}, nil case *appsv1.Deployment: if typed.Spec.Replicas != nil { return &ConstReplicas{int(*typed.Spec.Replicas)}, nil } return &ConstReplicas{0}, nil case *appsv1.StatefulSet: if typed.Spec.Replicas != nil { return &ConstReplicas{int(*typed.Spec.Replicas)}, nil } return &ConstReplicas{0}, nil case *appsv1.DaemonSet: return getDaemonSetNumSchedulableNodes(c, &typed.Spec.Template.Spec) case *batch.Job: if typed.Spec.Parallelism != nil { return &ConstReplicas{int(*typed.Spec.Parallelism)}, nil } return &ConstReplicas{0}, nil default: return nil, fmt.Errorf("unsupported kind when getting number of replicas: %v", obj) } } // getDaemonSetNumSchedulableNodes returns the number of schedulable nodes matching both nodeSelector and NodeAffinity. func getDaemonSetNumSchedulableNodes(c clientset.Interface, podSpec *corev1.PodSpec) (ReplicasWatcher, error) { selector, err := metav1.LabelSelectorAsSelector(metav1.SetAsLabelSelector(podSpec.NodeSelector)) if err != nil { return nil, err } return NewNodeCounter(c, selector, podSpec.Affinity, podSpec.Tolerations), nil } // Note: This function assumes each controller has field Spec.Replicas, except DaemonSets and Job. func getReplicasFromUnstrutured(c clientset.Interface, obj *unstructured.Unstructured) (ReplicasWatcher, error) { spec, err := getSpecFromUnstrutured(obj) if err != nil { return nil, err } return tryAcquireReplicasFromUnstructuredSpec(c, spec, obj.GetKind()) } func tryAcquireReplicasFromUnstructuredSpec(c clientset.Interface, spec map[string]interface{}, kind string) (ReplicasWatcher, error) { switch kind { case "DaemonSet": parser, err := newDaemonSetPodSpecParser(spec) if err != nil { return nil, err } var podSpec corev1.PodSpec if err := parser.getDaemonSetNodeSelectorFromUnstructuredSpec(&podSpec); err != nil { return nil, err } if err := parser.getDaemonSetAffinityFromUnstructuredSpec(&podSpec); err != nil { return nil, err } if err := parser.getDaemonSetTolerationsFromUnstructuredSpec(&podSpec); err != nil { return nil, err } return getDaemonSetNumSchedulableNodes(c, &podSpec) case "Job": replicas, found, err := unstructured.NestedInt64(spec, "parallelism") if err != nil { return nil, fmt.Errorf("try to acquire job parallelism failed, %v", err) } if !found { return &ConstReplicas{0}, nil } return &ConstReplicas{int(replicas)}, nil default: replicas, found, err := unstructured.NestedInt64(spec, "replicas") if err != nil { return nil, fmt.Errorf("try to acquire replicas failed, %v", err) } if !found { return &ConstReplicas{0}, nil } return &ConstReplicas{int(replicas)}, nil } } type daemonSetPodSpecParser map[string]interface{} func newDaemonSetPodSpecParser(spec map[string]interface{}) (daemonSetPodSpecParser, error) { template, found, err := unstructured.NestedMap(spec, "template") if err != nil || !found { return nil, err } podSpec, found, err := unstructured.NestedMap(template, "spec") if err != nil || !found { return nil, err } return podSpec, nil } func (p daemonSetPodSpecParser) getDaemonSetNodeSelectorFromUnstructuredSpec(spec *corev1.PodSpec) error { nodeSelector, _, err := unstructured.NestedStringMap(p, "nodeSelector") spec.NodeSelector = nodeSelector return err } func (p daemonSetPodSpecParser) getDaemonSetAffinityFromUnstructuredSpec(spec *corev1.PodSpec) error { unstructuredAffinity, found, err := unstructured.NestedMap(p, "affinity") if err != nil || !found { return err } affinity := &corev1.Affinity{} err = runtime.DefaultUnstructuredConverter.FromUnstructured(unstructuredAffinity, affinity) spec.Affinity = affinity return err } func addOrUpdateTolerationInPodSpec(spec *corev1.PodSpec, toleration *corev1.Toleration) { var newTolerations []corev1.Toleration updated := false for i := range spec.Tolerations { if toleration.MatchToleration(&spec.Tolerations[i]) { newTolerations = append(newTolerations, *toleration) updated = true continue } newTolerations = append(newTolerations, spec.Tolerations[i]) } if !updated { newTolerations = append(newTolerations, *toleration) } spec.Tolerations = newTolerations } // addOrUpdateDaemonPodTolerations adds tolerations that are added to each pod // by daemonset controller. // NOTICE: keep in sync with // // https://github.com/kubernetes/kubernetes/blob/release-1.32/pkg/controller/daemon/util/daemonset_util.go#L48 func addOrUpdateDaemonPodTolerations(spec *corev1.PodSpec) { tolerations := []corev1.Toleration{ { Key: corev1.TaintNodeNotReady, Operator: corev1.TolerationOpExists, Effect: corev1.TaintEffectNoExecute, }, { Key: corev1.TaintNodeDiskPressure, Operator: corev1.TolerationOpExists, Effect: corev1.TaintEffectNoSchedule, }, { Key: corev1.TaintNodeMemoryPressure, Operator: corev1.TolerationOpExists, Effect: corev1.TaintEffectNoSchedule, }, { Key: corev1.TaintNodePIDPressure, Operator: corev1.TolerationOpExists, Effect: corev1.TaintEffectNoSchedule, }, { Key: corev1.TaintNodeUnschedulable, Operator: corev1.TolerationOpExists, Effect: corev1.TaintEffectNoSchedule, }, } if spec.HostNetwork { tolerations = append(tolerations, corev1.Toleration{ Key: corev1.TaintNodeNetworkUnavailable, Operator: corev1.TolerationOpExists, Effect: corev1.TaintEffectNoSchedule, }) } for i := range tolerations { addOrUpdateTolerationInPodSpec(spec, &tolerations[i]) } } func (p daemonSetPodSpecParser) getDaemonSetTolerationsFromUnstructuredSpec(spec *corev1.PodSpec) error { addOrUpdateDaemonPodTolerations(spec) unstructuredTolerations, found, err := unstructured.NestedSlice(p, "tolerations") if err != nil || !found { return err } for _, unstructuredToleration := range unstructuredTolerations { var toleration corev1.Toleration err = runtime.DefaultUnstructuredConverter.FromUnstructured(unstructuredToleration.(map[string]interface{}), &toleration) if err != nil { break } addOrUpdateTolerationInPodSpec(spec, &toleration) } return err } // IsEqualRuntimeObjectsSpec returns true if given runtime objects have identical specs. func IsEqualRuntimeObjectsSpec(runtimeObj1, runtimeObj2 runtime.Object) (bool, error) { runtimeObj1Spec, err := GetSpecFromRuntimeObject(runtimeObj1) if err != nil { return false, err } runtimeObj2Spec, err := GetSpecFromRuntimeObject(runtimeObj2) if err != nil { return false, err } return equality.Semantic.DeepEqual(runtimeObj1Spec, runtimeObj2Spec), nil } // GetNumObjectsMatchingSelector returns number of objects matching the given selector. func GetNumObjectsMatchingSelector(c dynamic.Interface, namespace string, resource schema.GroupVersionResource, labelSelector labels.Selector) (int, error) { var numObjects int listFunc := func() error { list, err := c.Resource(resource).Namespace(namespace).List(context.TODO(), metav1.ListOptions{LabelSelector: labelSelector.String()}) if err != nil { return err } numObjects = len(list.Items) return nil } err := client.RetryWithExponentialBackOff(client.RetryFunction(listFunc)) return numObjects, err } // The pod can only schedule onto nodes that satisfy requirements in NodeAffinity. func podMatchesNodeAffinity(affinity *corev1.Affinity, node *corev1.Node) (bool, error) { // 1. nil NodeSelector matches all nodes (i.e. does not filter out any nodes) // 2. nil []NodeSelectorTerm (equivalent to non-nil empty NodeSelector) matches no nodes // 3. zero-length non-nil []NodeSelectorTerm matches no nodes also, just for simplicity // 4. nil []NodeSelectorRequirement (equivalent to non-nil empty NodeSelectorTerm) matches no nodes // 5. zero-length non-nil []NodeSelectorRequirement matches no nodes also, just for simplicity // 6. non-nil empty NodeSelectorRequirement is not allowed if affinity != nil && affinity.NodeAffinity != nil { nodeAffinity := affinity.NodeAffinity // if no required NodeAffinity requirements, will do no-op, means select all nodes. if nodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution == nil { return true, nil } return corev1helpers.MatchNodeSelectorTerms(node, nodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution) } return true, nil }