pkg/controllers/elasticquota_controller.go (107 lines of code) (raw):

/* Copyright 2023 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package controllers import ( "context" "fmt" v1 "k8s.io/api/core/v1" apiequality "k8s.io/apimachinery/pkg/api/equality" apierrs "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/runtime" quota "k8s.io/apiserver/pkg/quota/v1" "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/log" schedv1alpha1 "sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1" ) type ElasticQuotaReconciler struct { recorder record.EventRecorder client.Client Scheme *runtime.Scheme Workers int } // +kubebuilder:rbac:groups=scheduling.x-k8s.io,resources=elasticquota,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=scheduling.x-k8s.io,resources=elasticquota/status,verbs=get;update;patch // +kubebuilder:rbac:groups=scheduling.x-k8s.io,resources=elasticquota/finalizers,verbs=update func (r *ElasticQuotaReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { log := log.FromContext(ctx) log.Info("reconciling") eqList := &schedv1alpha1.ElasticQuotaList{} if err := r.List(ctx, eqList, client.InNamespace(req.Namespace)); err != nil { if apierrs.IsNotFound(err) { log.V(5).Info("no elasticquota found") return ctrl.Result{}, nil } log.V(3).Error(err, "Unable to retrieve elasticquota") return ctrl.Result{}, err } // TODO: When elastic quota supports multiple instances in a namespace, modify this if len(eqList.Items) == 0 { log.V(5).Info("no elasticquota found") return ctrl.Result{}, nil } eq := &eqList.Items[0] used, err := r.computeElasticQuotaUsed(ctx, req.Namespace, eq) if err != nil { return ctrl.Result{}, err } // Ignore this loop if the usage value has not changed if apiequality.Semantic.DeepEqual(used, eq.Status.Used) { return ctrl.Result{}, nil } // create a usage object that is based on the elastic quota version that will handle updates // by default, we set used to the current status newEQ := eq.DeepCopy() newEQ.Status.Used = used if err = r.patchElasticQuota(ctx, eq, newEQ); err != nil { return ctrl.Result{}, err } r.recorder.Event(eq, v1.EventTypeNormal, "Synced", fmt.Sprintf("Elastic Quota %s synced successfully", req.NamespacedName)) return ctrl.Result{}, nil } func (r *ElasticQuotaReconciler) patchElasticQuota(ctx context.Context, old, new *schedv1alpha1.ElasticQuota) error { patch := client.MergeFrom(old) return r.Status().Patch(ctx, new, patch) } func (r *ElasticQuotaReconciler) computeElasticQuotaUsed(ctx context.Context, namespace string, eq *schedv1alpha1.ElasticQuota) (v1.ResourceList, error) { used := newZeroUsed(eq) podList := &v1.PodList{} if err := r.List(ctx, podList, client.InNamespace(namespace)); err != nil { return nil, err } for _, p := range podList.Items { if p.Status.Phase == v1.PodRunning { used = quota.Add(used, computePodResourceRequest(&p)) } } return used, nil } // computePodResourceRequest returns a v1.ResourceList that covers the largest // width in each resource dimension. Because init-containers run sequentially, we collect // the max in each dimension iteratively. In contrast, we sum the resource vectors for // regular containers since they run simultaneously. // // If Pod Overhead is specified and the feature gate is set, the resources defined for Overhead // are added to the calculated Resource request sum // // Example: // // Pod: // // InitContainers // IC1: // CPU: 2 // Memory: 1G // IC2: // CPU: 2 // Memory: 3G // Containers // C1: // CPU: 2 // Memory: 1G // C2: // CPU: 1 // Memory: 1G // // Result: CPU: 3, Memory: 3G func computePodResourceRequest(pod *v1.Pod) v1.ResourceList { result := v1.ResourceList{} for _, container := range pod.Spec.Containers { result = quota.Add(result, container.Resources.Requests) } initRes := v1.ResourceList{} // take max_resource for init_containers for _, container := range pod.Spec.InitContainers { initRes = quota.Max(initRes, container.Resources.Requests) } // If Overhead is being utilized, add to the total requests for the pod if pod.Spec.Overhead != nil { quota.Add(result, pod.Spec.Overhead) } // take max_resource for init_containers and containers return quota.Max(result, initRes) } // newZeroUsed will return the zero value of the union of min and max func newZeroUsed(eq *schedv1alpha1.ElasticQuota) v1.ResourceList { minResources := quota.ResourceNames(eq.Spec.Min) maxResources := quota.ResourceNames(eq.Spec.Max) res := v1.ResourceList{} for _, v := range minResources { res[v] = *resource.NewQuantity(0, resource.DecimalSI) } for _, v := range maxResources { res[v] = *resource.NewQuantity(0, resource.DecimalSI) } return res } func (r *ElasticQuotaReconciler) SetupWithManager(mgr ctrl.Manager) error { r.recorder = mgr.GetEventRecorderFor("ElasticQuotaController") return ctrl.NewControllerManagedBy(mgr). Watches(&v1.Pod{}, &handler.EnqueueRequestForObject{}). For(&schedv1alpha1.ElasticQuota{}). WithOptions(controller.Options{MaxConcurrentReconciles: r.Workers}). Complete(r) }