webhooks/alloydb-mutating-wh/handlers/pod_tolerations_handler.go (178 lines of code) (raw):

package handlers import ( "encoding/json" "fmt" "io" "net/http" "os" "path/filepath" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" log "k8s.io/klog/v2" "k8s.io/api/admission/v1beta1" ) type AdmitFunc func(*v1beta1.AdmissionReview, []corev1.Toleration) *v1beta1.AdmissionResponse var tolerations []corev1.Toleration func Routes() { http.HandleFunc("/mutate", func(w http.ResponseWriter, r *http.Request) { serve(w, r, mutatePod) }) log.Info("handlers.Routes():Registered the handler for the path /mutate") } func BuildTolerations() { path := os.Getenv("TOLERATION_CONFIG_PATH") fileName := os.Getenv("TOLERATION_CONFIG_FILE") filePath := filepath.Join(path, fileName) configFile, err := os.Open(filePath) if err != nil { log.Fatalf(fmt.Sprintf("handlers.BuildTolerations():Error opening tolerations config file %s", os.Getenv("TOLERATION_CONFIG_PATH")), err) } defer configFile.Close() data, err := io.ReadAll(configFile) if err != nil { log.Fatalf("handlers.BuildTolerations():Error reading the toleration data from the file:: %v", err) } if err = json.Unmarshal(data, &tolerations); err != nil { log.Fatalf("handlers.BuildTolerations():Error unmarshalling the toleration data from the file:: %v", err) } log.Info("handlers.BuildTolerations():Initialized the tolerations to be configured for the pod") } func serve(w http.ResponseWriter, r *http.Request, admit AdmitFunc) { if r.Method != http.MethodPost { if r.Header.Get("User-Agent") == "Kubelet" { w.WriteHeader(http.StatusOK) return } log.Errorf("handlers.serve():Received a %s request instead of POST", r.Method) http.Error(w, fmt.Sprintf("Only POST requests are accepted, received: %s", r.Method), http.StatusMethodNotAllowed) return } var body []byte if r.Body != nil { if data, err := io.ReadAll(r.Body); err != nil { log.Errorf("handlers.serve():Error occured while reading from the request %v", err) http.Error(w, fmt.Sprintf("Could not read the request body or the request body is empty: %v", err), http.StatusBadRequest) return } else { body = data } } else { http.Error(w, "Empty request received from the client", http.StatusBadRequest) return } defer r.Body.Close() contentType := r.Header.Get("Content-Type") if contentType != "application/json" { log.Errorf("handlers.serve():Error occurred, content-type must be set to application/json but received %s", contentType) http.Error(w, fmt.Sprintf("Invalid Content-Type header received: %s", contentType), http.StatusBadRequest) return } addmissionReview := v1beta1.AdmissionReview{} if err := json.Unmarshal(body, &addmissionReview); err != nil { log.Errorf("handlers.serve():Could not unmarshall the AdmissionReview object from the request:: %v", err) http.Error(w, fmt.Sprintf("Could not unmarshall AdmissionReview from the request body is empty:: %v", err), http.StatusBadRequest) return } log.Infof("handlers.serve():Received a valid AdmissionReview for mutating the pod UID = %s", addmissionReview.Request.UID) admissionResponse := admit(&addmissionReview, tolerations) addmissionReview.Response = admissionResponse resp, err := json.Marshal(addmissionReview) if err != nil { log.Errorf("handlers.serve():Error marshalling the AdmissionReview object:: %v", err) http.Error(w, fmt.Sprintf("Error marshalling the AdmissionReview object:: %v", err), http.StatusInternalServerError) return } w.Header().Set("Content-Type", "application/json") if _, err := w.Write(resp); err != nil { log.Errorf("handlers.serve():Error writing JSON response back to the client:: %v", err) http.Error(w, fmt.Sprintf("Error writing the JSON back to the client:: %v", err), http.StatusInternalServerError) return } } func mutatePod(ar *v1beta1.AdmissionReview, tols []corev1.Toleration) *v1beta1.AdmissionResponse { log.Info("handlers.mutatePod():Starting to add AlloyDB Omninodepool specific tolerations to the pod") raw := ar.Request.Object.Raw pod := corev1.Pod{} if err := json.Unmarshal(raw, &pod); err != nil { return &v1beta1.AdmissionResponse{ UID: ar.Request.UID, Allowed: false, Result: &metav1.Status{ Message: err.Error(), }, } } if pod.TypeMeta.Kind != "Pod" { return &v1beta1.AdmissionResponse{ UID: ar.Request.UID, Allowed: false, Result: &metav1.Status{ Message: "Invalid Kind for the request, only pods are supported for mutation", }, } } if len(tols) == 0 { return &v1beta1.AdmissionResponse{ UID: ar.Request.UID, Allowed: true, Result: &metav1.Status{ Status: "Success", }, } } existing := pod.Spec.Tolerations // Existing tolerations combined := []corev1.Toleration{} // Existing & newly added combined if len(existing) == 0 { // When no existing tolerations, combined = newly added only combined = tols } else { for _, t := range tols { if !exists(t, existing) { combined = append(combined, t) } } combined = append(combined, existing...) } patch, err := constructPatch(combined) if err != nil { log.Errorf("handlers.mutatePod():Could not create a patch for adding tolerations to the pod:: %v", err) return &v1beta1.AdmissionResponse{ UID: ar.Request.UID, Allowed: false, Result: &metav1.Status{ Message: err.Error(), }, } } log.Info("handlers.mutatePod():Added the AlloyDB Omni nodepool specific tolerations to the pod & returning the patch") return &v1beta1.AdmissionResponse{ UID: ar.Request.UID, Allowed: true, Patch: patch, PatchType: func() *v1beta1.PatchType { pt := v1beta1.PatchTypeJSONPatch return &pt }(), } } func exists(add corev1.Toleration, existing []corev1.Toleration) bool { for _, e := range existing { if add.Key == e.Key { // Only check for the key & and if there's a match, just don't overwrite it, regardless of the operator or effect return true } } return false } func constructPatch(combined []corev1.Toleration) ([]byte, error) { patch := []interface{}{ map[string]interface{}{ "op": "replace", "path": "/spec/tolerations", "value": combined, }, } patchBytes, err := json.Marshal(patch) if err != nil { return nil, err } return patchBytes, nil }