images/controller/cmd/reservation_broker/reservation_broker.go (561 lines of code) (raw):

/* Copyright 2020 Google Inc. All rights reserved. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package main import ( "crypto/sha1" "encoding/json" "fmt" "io" "log" "math/rand" "net/http" "os" "os/exec" "path" "path/filepath" "sort" "strings" "sync" "time" "github.com/gorilla/mux" broker "selkies.io/controller/pkg" ) // Cookie max-age in seconds, 5 days. const maxCookieAgeSeconds = 432000 // Wraps server muxer, dynamic map of handlers, and listen port. type Server struct { Dispatcher *mux.Router Urls map[string]func(w http.ResponseWriter, r *http.Request) Port string } type BrokerPod struct { Name string `json:"name"` IP string `json:"ip"` SessionKey string `json:"session_key"` UserObjects []string `json:"user_objects"` } type AppContext struct { sync.RWMutex AuthHeaderName string UsernameHeader string CookieSecret string PodData broker.UserPodData AvailablePods []BrokerPod ReservedPods map[string]BrokerPod PodWatcherRunning bool } type GetPodsSpec struct { Items []struct { Metadata struct { Name string `json:"name"` Namespace string `json:"namespace"` CreationTimestamp string `json:"creationTimestamp"` DeletionTimestamp *string `json:"deletionTimestamp"` Annotations map[string]string `json:"annotations"` Labels map[string]string `json:"labels"` } `json:"metadata"` Status struct { PodIPs []struct { IP string `json:"ip"` } `json:"podIPs"` } `json:"status"` } `json:"items"` } func main() { cookieSecret := os.Getenv("COOKIE_SECRET") if len(cookieSecret) == 0 { // Generate random secret log.Printf("no COOKIE_SECRET env var found, generating random secret value") h := sha1.New() io.WriteString(h, fmt.Sprintf("%d.%d", rand.Intn(10000), int32(time.Now().Unix()))) cookieSecret = fmt.Sprintf("%x", h.Sum(nil)) } clientID := os.Getenv("OAUTH_CLIENT_ID") if len(clientID) == 0 { log.Fatalf("missing env, OAUTH_CLIENT_ID") } // Project ID from instance metadata projectID, err := broker.GetProjectID() if err != nil { log.Fatalf("failed to determine project ID: %v", err) } // Region from instance metadata brokerRegion, err := broker.GetInstanceRegion() if err != nil { log.Fatalf("failed to determine broker region: %v", err) } // Values available to templates from environment variables prefixed with POD_BROKER_PARAM_Name=Value // Map of Name=Value sysParams := broker.GetEnvPrefixedVars("POD_BROKER_PARAM_") // Domain from params domain, ok := sysParams["Domain"] if !ok { log.Fatal("Missing POD_BROKER_PARAM_Domain env.") } // AuthHeader from params authHeaderName, ok := sysParams["AuthHeader"] if !ok { log.Fatal("Missing POD_BROKER_PARAM_AuthHeader env.") } usernameHeader, _ := sysParams["UsernameHeader"] // Period which to scan for apps scanPeriod := 5 * time.Second // Period which to re-apply manifests resyncPeriod := 60 * time.Second // Map of cached app manifest checksums manifestChecksums := make(map[string]string, 0) // Muxed server to handle per-app routes. server := &Server{ Port: "8082", Dispatcher: mux.NewRouter(), Urls: make(map[string]func(w http.ResponseWriter, r *http.Request)), } // Map of app contexts. appContexts := make(map[string]*AppContext, 0) // Sync loop for app resources go func() { lastSync := time.Now() for { // Discover apps from their config specs located on the filesystem. // TODO: look into caching this, large number of apps and http requests can slow down the broker. registeredApps, err := broker.NewRegisteredAppManifestFromJSON(broker.RegisteredAppsManifestJSONFile, broker.AppTypeDeployment) if err != nil { log.Printf("failed to parse registered app manifest: %v", err) time.Sleep(2 * time.Second) continue } for _, app := range registeredApps.Apps { if len(app.Deployment.Selector) == 0 { log.Printf("error app is missing deployment.selector: %s, skipping", app.Name) continue } // Common variables id := broker.MakePodID(app.Name) fullName := fmt.Sprintf("%s-%s", app.Name, id) namespace := app.Name ts := fmt.Sprintf("%d", time.Now().Unix()) // Verify that the DefaultTier is in the list of NodeTiers and use it. var nodeTierSpec broker.NodeTierSpec found := false for _, tier := range app.NodeTiers { if tier.Name == app.DefaultTier { nodeTierSpec = tier found = true break } } if !found { log.Printf("Default tier '%s' not found in list of app node tiers", app.DefaultTier) continue } // Build map of app params from app config spec appParams := make(map[string]string, 0) for _, param := range app.AppParams { appParams[param.Name] = param.Default } // Path to write compiled template output to. destDir := path.Join(broker.BuildSourceBaseDir, app.Name) // Create template data data := &broker.UserPodData{ Namespace: namespace, ProjectID: projectID, ClientID: clientID, AppSpec: app, App: app.Name, ImageRepo: app.DefaultRepo, ImageTag: app.DefaultTag, NodeTier: nodeTierSpec, Domain: domain, User: app.Name, Username: app.Name, ID: id, FullName: fullName, ServiceName: app.ServiceName, Resources: []string{}, Patches: []string{}, JSONPatchesService: []string{}, JSONPatchesVirtualService: []string{}, JSONPatchesDeploy: []string{}, AppParams: appParams, SysParams: sysParams, NetworkPolicyData: registeredApps.NetworkPolicyData, Timestamp: ts, Region: brokerRegion, } // Build the application bundle. srcDirApp := path.Join(broker.BundleSourceBaseDir, app.Name) if err := broker.BuildDeploy(broker.BrokerCommonBuildSourceBaseDirDeploymentApp, srcDirApp, destDir, data); err != nil { log.Printf("%v", err) continue } var appCtx *AppContext if c, ok := appContexts[app.Name]; ok { appCtx = c } else { appCtx = &AppContext{ AuthHeaderName: authHeaderName, UsernameHeader: usernameHeader, CookieSecret: cookieSecret, PodData: *data, AvailablePods: make([]BrokerPod, 0), ReservedPods: make(map[string]BrokerPod), PodWatcherRunning: false, } appContexts[app.Name] = appCtx } // Register the app handler registerAppHandler(server, app, appCtx) // Start the pod watcher if !appCtx.PodWatcherRunning { watchPods(app, appCtx) } // Compute and cache checksum to know if we need to re-apply the manifests. prevChecksum := manifestChecksums[app.Name] if manifestChecksums[app.Name], err = broker.ChecksumDeploy(destDir); err != nil { log.Printf("failed to checksum build output directory: %v", err) continue } if prevChecksum != manifestChecksums[app.Name] { log.Printf("%s manifest checksum: %s", app.Name, manifestChecksums[app.Name]) } else { now := time.Now() if now.Sub(lastSync) >= resyncPeriod { lastSync = now } else { continue } } // Apply manifests them to the cluster. log.Printf("deploying manifests for app: %s", destDir) cmd := exec.Command("sh", "-o", "pipefail", "-c", fmt.Sprintf("kustomize build %s | kubectl apply -f -", destDir)) cmd.Dir = destDir stdoutStderr, err := cmd.CombinedOutput() if err != nil { log.Printf("error calling kubectl for %s: %v\n%s", app.Name, err, stdoutStderr) continue } } // Prune deleted BrokerAppConfigs, delete namespace and files. foundDirs, err := filepath.Glob(path.Join(broker.BuildSourceBaseDir, "*")) if err != nil { log.Printf("failed to list app directories to prune: %v", err) } for _, dirName := range foundDirs { found := false for _, app := range registeredApps.Apps { if app.Name == path.Base(dirName) { found = true break } } if !found { appName := path.Base(dirName) log.Printf("removing app: %s", appName) // Stop the pod watcher appContexts[appName].PodWatcherRunning = false // Remove app context delete(appContexts, appName) // Remove app from checksum cache delete(manifestChecksums, appName) // Delete the app namespace cmd := exec.Command("sh", "-o", "pipefail", "-c", fmt.Sprintf("kubectl delete --wait=false ns %s", appName)) stdoutStderr, err := cmd.CombinedOutput() if err != nil { log.Printf("error calling kubectl to delete namespace %s: %v\n%s", appName, err, stdoutStderr) } // Delete the app directory os.RemoveAll(dirName) } } time.Sleep(scanPeriod) } }() server.InitDispatch() log.Printf("Initializing request routes...\n") server.Urls["metadata"] = func(w http.ResponseWriter, r *http.Request) { srcIP := strings.Split(r.RemoteAddr, ":")[0] // Check reserved pods to match requestor IP. for _, appCtx := range appContexts { for user, pod := range appCtx.ReservedPods { if pod.IP == srcIP { metadata := broker.ReservationMetadataSpec{ IP: pod.IP, SessionKey: pod.SessionKey, User: user, } w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(metadata) return } } } writeResponse(w, http.StatusNotFound, fmt.Sprintf("reservation metadata not found for IP: %s", srcIP)) } server.Start() } func (s *Server) Start() { log.Printf("Starting server on port: %s \n", s.Port) http.ListenAndServe(":"+s.Port, s.Dispatcher) } func (s *Server) InitDispatch() { d := s.Dispatcher d.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { writeResponse(w, http.StatusOK, "OK") }) d.HandleFunc("/{appName}/", func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) appName := vars["appName"] s.ProxyCall(w, r, appName) }) } func (s *Server) ProxyCall(w http.ResponseWriter, r *http.Request, fName string) { if s.Urls[fName] != nil { s.Urls[fName](w, r) } } func writeResponse(w http.ResponseWriter, statusCode int, message string) { status := broker.StatusResponse{ Code: statusCode, Status: message, } w.Header().Set("Content-Type", "application/json") w.WriteHeader(statusCode) json.NewEncoder(w).Encode(status) } /* Registers handler for app. Handler dispatches requests for HTTP verbs, POST, DELETE, GET. */ func registerAppHandler(s *Server, app broker.AppConfigSpec, appCtx *AppContext) { // Register app route handler function appName := app.Name cookieName := fmt.Sprintf("broker_%s", appName) s.Urls[app.Name] = func(w http.ResponseWriter, r *http.Request) { // Get user from cookie or header user := broker.GetUserFromCookieOrAuthHeader(r, cookieName, appCtx.AuthHeaderName) if len(user) == 0 { writeResponse(w, http.StatusBadRequest, fmt.Sprintf("Failed to get user from cookie or auth header")) return } // IAP uses a prefix of accounts.google.com:email, remove this to just get the email userToks := strings.Split(user, ":") user = userToks[len(userToks)-1] username := broker.GetUsernameFromHeaderOrDefault(r, appCtx.UsernameHeader, user) // Handle each verb switch r.Method { case "POST": status, msg := createApp(app, appCtx, user, username) writeResponse(w, status, msg) case "DELETE": status, msg := deleteApp(app, appCtx, user, username) writeResponse(w, status, msg) case "GET": status, msg := getAppStatus(w, app, appCtx, user, username) writeResponse(w, status, msg) } } } /* Watches a deployment for pods TODO: convert this to use the K8S watch API. */ func watchPods(app broker.AppConfigSpec, appCtx *AppContext) { appCtx.PodWatcherRunning = true // Get current pod reservations, those not managed by Deployment, reserved for users. selector := fmt.Sprintf("%s, app.kubernetes.io/managed-by notin (pod-broker)", app.Deployment.Selector) podResp, err := listBrokerPods(app.Name, selector) if err != nil { log.Printf("failed to list initial reserved pods: %v", err) } else { appCtx.Lock() for _, pod := range podResp.Items { if pod.Metadata.DeletionTimestamp != nil { // Skip terminating pods continue } podName := pod.Metadata.Name podIP := pod.Status.PodIPs[0].IP if podUser, ok := pod.Metadata.Annotations["app.broker/user"]; ok { sessionKey, ok := pod.Metadata.Annotations["app.broker/session-key"] userObjects, ok := pod.Metadata.Annotations["app.broker/last-applied-object-types"] if !ok { log.Printf("Warning: missing app.broker/session-key on existing reservation: %s", podName) } log.Printf("Found existing reservation: %s: %s", podName, podUser) appCtx.ReservedPods[podUser] = BrokerPod{ Name: podName, IP: podIP, SessionKey: sessionKey, UserObjects: strings.Split(userObjects, ","), } } } appCtx.Unlock() } go func() { log.Printf("started pod watcher for %s", app.Name) for { if !appCtx.PodWatcherRunning { break } appCtx.Lock() // Find available pods, those currently managed by Deployment. selector := fmt.Sprintf("%s,app.kubernetes.io/managed-by=pod-broker", app.Deployment.Selector) podResp, err := listBrokerPods(app.Name, selector) if err != nil { log.Printf("failed to list pods for app: %s: %v", app.Name, err) time.Sleep(2 * time.Second) appCtx.Unlock() continue } // Sort by creation time, descending order. sort.Slice(podResp.Items, func(i, j int) bool { return podResp.Items[i].Metadata.CreationTimestamp < podResp.Items[j].Metadata.CreationTimestamp }) appCtx.AvailablePods = make([]BrokerPod, 0) for _, pod := range podResp.Items { if len(pod.Status.PodIPs) == 0 { continue } if pod.Metadata.DeletionTimestamp != nil { // Skip terminating pods continue } appCtx.AvailablePods = append(appCtx.AvailablePods, BrokerPod{ Name: pod.Metadata.Name, IP: pod.Status.PodIPs[0].IP, }) } appCtx.Unlock() time.Sleep(2 * time.Second) } log.Printf("stopped pod watcher for %s", app.Name) }() } func updatePodForUser(app broker.AppConfigSpec, user, sessionKey, pod string, objectTypes []string) error { // Remove label from the pod that releases it from the K8S Deployment controller. cmd := exec.Command("sh", "-c", fmt.Sprintf("kubectl label pod -n %s %s app.kubernetes.io/managed-by=reservation-broker --overwrite=true 1>&2", app.Name, pod)) stdoutStderr, err := cmd.CombinedOutput() if err != nil { return fmt.Errorf("%s\n%v", stdoutStderr, err) } // Add broker user annotation cmd = exec.Command("sh", "-c", fmt.Sprintf("kubectl annotate pod --overwrite=true -n %s %s 'app.broker/user=%s' 1>&2", app.Name, pod, user)) stdoutStderr, err = cmd.CombinedOutput() if err != nil { return fmt.Errorf("%s\n%v", stdoutStderr, err) } // Add session key annotation cmd = exec.Command("sh", "-c", fmt.Sprintf("kubectl annotate pod --overwrite=true -n %s %s 'app.broker/session-key=%s' 1>&2", app.Name, pod, sessionKey)) stdoutStderr, err = cmd.CombinedOutput() if err != nil { return fmt.Errorf("%s\n%v", stdoutStderr, err) } // Add annotation with found object types cmd = exec.Command("sh", "-c", fmt.Sprintf("kubectl annotate pod --overwrite=true -n %s %s 'app.broker/last-applied-object-types=%s' 1>&2", app.Name, pod, strings.Join(objectTypes, ","))) stdoutStderr, err = cmd.CombinedOutput() if err != nil { return fmt.Errorf("%s\n%v", stdoutStderr, err) } // Add label for instance ID instanceID := fmt.Sprintf("%s-%s", app.Name, broker.MakePodID(user)) cmd = exec.Command("sh", "-c", fmt.Sprintf("kubectl label pod -n %s %s app.kubernetes.io/instance=%s --overwrite=true 1>&2", app.Name, pod, instanceID)) stdoutStderr, err = cmd.CombinedOutput() if err != nil { return fmt.Errorf("%s\n%v", stdoutStderr, err) } return nil } /* Get status of reservation. */ func getAppStatus(w http.ResponseWriter, app broker.AppConfigSpec, appCtx *AppContext, user, username string) (int, string) { statusCode := http.StatusOK msg := "" instanceID := fmt.Sprintf("%s-%s", app.Name, broker.MakePodID(user)) selector := fmt.Sprintf("app.kubernetes.io/instance=%s", instanceID) status, err := broker.GetPodStatus(app.Name, selector) if err != nil { log.Printf("failed to get pod status for selector: %s: %v", selector, err) statusCode = http.StatusInternalServerError msg = "error fetching status" return statusCode, msg } msg = status.Status if status.Status == "waiting" { statusCode = http.StatusCreated } if status.Status == "ready" { statusCode = http.StatusOK cookieName := fmt.Sprintf("broker_%s", app.Name) cookieValue := broker.MakeCookieValue(user, appCtx.CookieSecret) appPath := fmt.Sprintf("/%s/", app.Name) broker.SetCookie(w, cookieName, cookieValue, appPath, maxCookieAgeSeconds) } return statusCode, msg } /* Obtain a reservation for the user. */ func createApp(app broker.AppConfigSpec, appCtx *AppContext, user, username string) (int, string) { statusCode := http.StatusOK msg := "" // Lock the reservation table so that users get an atomic reservation and they can't reserve multiple pods. appCtx.Lock() defer appCtx.Unlock() if pod, ok := appCtx.ReservedPods[user]; ok { msg = fmt.Sprintf("pod for %s: %s", user, pod.Name) return statusCode, msg } if len(appCtx.AvailablePods) == 0 { statusCode = http.StatusNotFound msg = "No available instances at this time" return statusCode, msg } // Generate session key sessionKey := broker.MakeSessionKey() // Assign user a pod and remove it from the list pod := appCtx.AvailablePods[0] appCtx.AvailablePods = appCtx.AvailablePods[1:] pod.SessionKey = sessionKey // Build the per-user manifest templates destDir, err := buildUserBundle(app, appCtx, user, username, pod) if err != nil { log.Printf("failed to build user bundle for %s/%s: %v", app.Name, user, err) statusCode = http.StatusInternalServerError msg = "error creating app" return statusCode, msg } // Determine the unique kinds of objects being applied and add them to a json patch that will add the list to an annotation. // This is done because 'kubectl delete all' does not capture things like CRDs or VirtualService so object can get orphaned. // See also: https://github.com/kubernetes/kubectl/issues/151 userObjects, err := broker.GetObjectTypes(destDir) if err != nil { log.Printf("failed to determine object types in bundle: %v", err) statusCode = http.StatusInternalServerError msg = "error creating app" return statusCode, msg } pod.UserObjects = userObjects // Update the pod for the user if err := updatePodForUser(app, user, pod.SessionKey, pod.Name, pod.UserObjects); err != nil { log.Printf("failed to update pod for user %s: %s: %v", user, pod.Name, err) statusCode = http.StatusInternalServerError msg = "error creating app" return statusCode, msg } // Apply the per-user manifests cmd := exec.Command("sh", "-o", "pipefail", "-c", fmt.Sprintf("kustomize build %s | kubectl apply -f -", destDir)) cmd.Dir = destDir stdoutStderr, err := cmd.CombinedOutput() if err != nil { log.Printf("error applying per-user manifests for %s: %v\n%s", user, err, stdoutStderr) statusCode = http.StatusInternalServerError msg = "error creating app" return statusCode, msg } log.Printf("assigned pod %s to user: %s", pod.Name, user) // Reserve pod for user in map appCtx.ReservedPods[user] = pod msg = fmt.Sprintf("assigned pod: %s", pod.Name) return statusCode, msg } /* Builds templates for user specific manifest. */ func buildUserBundle(app broker.AppConfigSpec, appCtx *AppContext, user, username string, pod BrokerPod) (string, error) { data := appCtx.PodData data.User = user data.Username = username data.CookieValue = broker.MakeCookieValue(user, appCtx.CookieSecret) data.ID = broker.MakePodID(user) data.FullName = fmt.Sprintf("%s-%s", app.Name, data.ID) data.Timestamp = fmt.Sprintf("%d", time.Now().Unix()) data.Resources = make([]string, 0) data.Patches = make([]string, 0) data.JSONPatchesService = make([]string, 0) data.JSONPatchesVirtualService = make([]string, 0) data.JSONPatchesDeploy = make([]string, 0) // Add sessionKey as app param. data.AppParams["sessionKey"] = pod.SessionKey srcDirUser := path.Join(broker.UserBundleSourceBaseDir, app.Name) destDirUser := path.Join(broker.BuildSourceBaseDirUser, user, app.Name) if err := broker.BuildDeploy(broker.BrokerCommonBuildSourceBaseDirDeploymentUser, srcDirUser, destDirUser, &data); err != nil { return "", err } return destDirUser, nil } /* Release a reservation and delete the pod. */ func deleteApp(app broker.AppConfigSpec, appCtx *AppContext, user, username string) (int, string) { statusCode := http.StatusOK msg := "shutdown" // Lock the reservation table appCtx.Lock() // Remove the reservation from the table. defer delete(appCtx.ReservedPods, user) defer appCtx.Unlock() if bPod, ok := appCtx.ReservedPods[user]; ok { podName := bPod.Name // Remove instance label from the pod. // This is done so that subsequest GET requests don't return the terminating pod. cmd := exec.Command("sh", "-c", fmt.Sprintf("kubectl label pod -n %s %s app.kubernetes.io/instance- 1>&2", app.Name, podName)) stdoutStderr, err := cmd.CombinedOutput() if err != nil { log.Printf("warning: failed to remove instance label from pod: %s: %s\n%v", podName, stdoutStderr, err) } // Delete the pod from K8S log.Printf("deleting pod for user %s: %s", user, podName) cmd = exec.Command("sh", "-c", fmt.Sprintf("kubectl delete pod -n %s %s --wait=false 1>&2", app.Name, podName)) stdoutStderr, err = cmd.CombinedOutput() if err != nil { log.Printf("failed to delete pod for user %s: %s: %s\n%v", user, podName, stdoutStderr, err) statusCode = http.StatusInternalServerError msg = "error deleting app" return statusCode, msg } // Delete the per-user resources if len(bPod.UserObjects) > 0 { objectTypes := strings.Join(bPod.UserObjects, ",") fullName := fmt.Sprintf("%s-%s", app.Name, broker.MakePodID(user)) cmdStr := fmt.Sprintf("kubectl delete %s -n %s -l \"app.kubernetes.io/instance=%s\" --wait=false", objectTypes, app.Name, fullName) cmd = exec.Command("sh", "-o", "pipefail", "-c", cmdStr) stdoutStderr, err = cmd.CombinedOutput() if err != nil { log.Printf("error deleting per-user resources for %s: %v\n%s", user, err, stdoutStderr) statusCode = http.StatusInternalServerError msg = "error deleting app" return statusCode, msg } } } return statusCode, msg } func listBrokerPods(namespace, selector string) (GetPodsSpec, error) { var resp GetPodsSpec cmd := exec.Command("sh", "-c", fmt.Sprintf("kubectl get pod -n %s -l \"%s\" -o json 1>&2", namespace, selector)) stdoutStderr, err := cmd.CombinedOutput() if err != nil { return resp, fmt.Errorf("failed to list pods: %s, %v", string(stdoutStderr), err) } var podResp GetPodsSpec if err := json.Unmarshal(stdoutStderr, &podResp); err != nil { return resp, fmt.Errorf("failed to parse pod spec in initial pod list: %v", err) } return podResp, nil }