webhooks/alloydb-nodeselector-mwh/handlers/pod_nodeselector_handler.go (172 lines of code) (raw):
package handlers
import (
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"k8s.io/api/admission/v1beta1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
log "k8s.io/klog/v2"
)
type AdmitFunc func(*v1beta1.AdmissionReview, map[string]string) *v1beta1.AdmissionResponse
var nodelSelectors map[string]string
func Routes() {
http.HandleFunc("/mutate", func(w http.ResponseWriter, r *http.Request) {
serve(w, r, mutatePod)
})
log.Info("handlers.Routes():Registered the handler for the path /mutate")
}
func BuildSelectors() {
path := os.Getenv("SELECTORS_CONFIG_PATH")
fileName := os.Getenv("SELECTORS_CONFIG_FILE")
filePath := filepath.Join(path, fileName)
configFile, err := os.Open(filePath)
if err != nil {
log.Fatalf(fmt.Sprintf("handlers.BuildSelectors():Error opening node selectors config file %s", os.Getenv("SELECTORS_CONFIG_PATH")), err)
}
defer configFile.Close()
data, err := io.ReadAll(configFile)
if err != nil {
log.Fatalf("handlers.BuildSelectors():Error reading the node selectors data from the file:: %v", err)
}
if err = json.Unmarshal(data, &nodelSelectors); err != nil {
log.Fatalf("handlers.BuildSelectors():Error unmarshalling the node selectors data from the file:: %v", err)
}
log.Info("handlers.BuildSelectors():Initialized the node selectors to be configured for the pod")
}
func serve(w http.ResponseWriter, r *http.Request, admit AdmitFunc) {
if r.Method != http.MethodPost {
if r.Header.Get("User-Agent") == "Kubelet" {
w.WriteHeader(http.StatusOK)
return
}
log.Errorf("handlers.serve():Received a %s request instead of POST", r.Method)
http.Error(w, fmt.Sprintf("Only POST requests are accepted, received: %s", r.Method), http.StatusMethodNotAllowed)
return
}
var body []byte
if r.Body != nil {
if data, err := io.ReadAll(r.Body); err != nil {
log.Errorf("handlers.serve():Error occured while reading from the request %v", err)
http.Error(w, fmt.Sprintf("Could not read the request body or the request body is empty: %v", err), http.StatusBadRequest)
return
} else {
body = data
}
} else {
http.Error(w, "Empty request received from the client", http.StatusBadRequest)
return
}
defer r.Body.Close()
contentType := r.Header.Get("Content-Type")
if contentType != "application/json" {
log.Errorf("handlers.serve():Error occurred, content-type must be set to application/json but received %s", contentType)
http.Error(w, fmt.Sprintf("Invalid Content-Type header received: %s", contentType), http.StatusBadRequest)
return
}
addmissionReview := v1beta1.AdmissionReview{}
if err := json.Unmarshal(body, &addmissionReview); err != nil {
log.Errorf("handlers.serve():Could not unmarshall the AdmissionReview object from the request:: %v", err)
http.Error(w, fmt.Sprintf("Could not unmarshall AdmissionReview from the request body is empty:: %v", err), http.StatusBadRequest)
return
}
log.Infof("handlers.serve():Received a valid AdmissionReview for mutating the pod UID = %s", addmissionReview.Request.UID)
admissionResponse := admit(&addmissionReview, nodelSelectors)
addmissionReview.Response = admissionResponse
resp, err := json.Marshal(addmissionReview)
if err != nil {
log.Errorf("handlers.serve():Error marshalling the AdmissionReview object:: %v", err)
http.Error(w, fmt.Sprintf("Error marshalling the AdmissionReview object:: %v", err), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
if _, err := w.Write(resp); err != nil {
log.Errorf("handlers.serve():Error writing JSON response back to the client:: %v", err)
http.Error(w, fmt.Sprintf("Error writing the JSON back to the client:: %v", err), http.StatusInternalServerError)
return
}
}
func mutatePod(ar *v1beta1.AdmissionReview, selectors map[string]string) *v1beta1.AdmissionResponse {
log.Info("handlers.mutatePod():Starting to add AlloyDB specific node selectors to the pod")
raw := ar.Request.Object.Raw
pod := corev1.Pod{}
if err := json.Unmarshal(raw, &pod); err != nil {
return &v1beta1.AdmissionResponse{
UID: ar.Request.UID,
Allowed: false,
Result: &metav1.Status{
Message: err.Error(),
},
}
}
if pod.TypeMeta.Kind != "Pod" {
return &v1beta1.AdmissionResponse{
UID: ar.Request.UID,
Allowed: false,
Result: &metav1.Status{
Message: "Invalid Kind for the request, only pods are supported for mutation",
},
}
}
if len(selectors) == 0 {
return &v1beta1.AdmissionResponse{
UID: ar.Request.UID,
Allowed: true,
Result: &metav1.Status{
Status: "Success",
},
}
}
existing := pod.Spec.NodeSelector
combined := mergeMaps(existing, selectors)
patch, err := constructPatch(combined)
if err != nil {
log.Errorf("handlers.mutatePod():Could not create a patch for adding node selectors to the pod:: %v", err)
return &v1beta1.AdmissionResponse{
UID: ar.Request.UID,
Allowed: false,
Result: &metav1.Status{
Message: err.Error(),
},
}
}
log.Info("handlers.mutatePod():Added the AlloyDB Omni nodepool specific node selectors to the pod & returning the patch")
return &v1beta1.AdmissionResponse{
UID: ar.Request.UID,
Allowed: true,
Patch: patch,
PatchType: func() *v1beta1.PatchType {
pt := v1beta1.PatchTypeJSONPatch
return &pt
}(),
}
}
func mergeMaps(existing, new map[string]string) map[string]string {
result := make(map[string]string)
for k, val := range existing {
result[k] = val
}
for k, val := range new {
if _, ok := result[k]; !ok { // Don't overwrite but only add unique keys
result[k] = val
}
}
return result
}
func constructPatch(combined map[string]string) ([]byte, error) {
patch := []interface{}{
map[string]interface{}{
"op": "replace",
"path": "/spec/nodeSelector",
"value": combined,
},
}
patchBytes, err := json.Marshal(patch)
if err != nil {
return nil, err
}
return patchBytes, nil
}