images/controller/cmd/pod_broker/pod_broker.go (529 lines of code) (raw):

/* Copyright 2019 Google Inc. All rights reserved. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package main import ( "crypto/sha1" "encoding/json" "fmt" "io" "log" "math/rand" "net/http" "net/http/httputil" "os" "os/exec" "path" "regexp" "strings" "time" broker "selkies.io/controller/pkg" ) // Cookie max-age in seconds, 5 days. const maxCookieAgeSeconds = 432000 func main() { cookieSecret := os.Getenv("COOKIE_SECRET") if len(cookieSecret) == 0 { // Generate random secret log.Printf("no COOKIE_SECRET env var found, generating random secret value") h := sha1.New() io.WriteString(h, fmt.Sprintf("%d.%d", rand.Intn(10000), int32(time.Now().Unix()))) cookieSecret = fmt.Sprintf("%x", h.Sum(nil)) } clientID := os.Getenv("OAUTH_CLIENT_ID") if len(clientID) == 0 { log.Fatalf("missing env, OAUTH_CLIENT_ID") } // Values available to templates from environment variables prefixed with POD_BROKER_PARAM_Name=Value // Map of Name=Value sysParams := broker.GetEnvPrefixedVars("POD_BROKER_PARAM_") // Project ID from instance metadata projectID, err := broker.GetProjectID() if err != nil { log.Fatalf("failed to determine project ID: %v", err) } // Region from instance metadata brokerRegion, err := broker.GetInstanceRegion() if err != nil { log.Fatalf("failed to determine broker region: %v", err) } // Title from params brokerName, ok := sysParams["Title"] if !ok { brokerName = "App Launcher" log.Printf("using default broker title: %s", brokerName) } // Theme from params brokerTheme, ok := sysParams["Theme"] if !ok { brokerTheme = "light" log.Printf("using default broker theme: %s", brokerTheme) } // Domain from params domain, ok := sysParams["Domain"] if !ok { log.Fatal("Missing POD_BROKER_PARAM_Domain env.") } // AuthHeader from params authHeaderName, ok := sysParams["AuthHeader"] if !ok { log.Fatal("Missing POD_BROKER_PARAM_AuthHeader env.") } usernameHeader, _ := sysParams["UsernameHeader"] // Authorized user image repo pattern regexp. allowedRepoPatternParam, ok := sysParams["AuthorizedUserRepoPattern"] if !ok { log.Fatal("Missing POD_BROKER_PARAM_AuthorizedUserRepoPattern env.") } allowedRepoPattern := regexp.MustCompile(allowedRepoPatternParam) http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { if _, ok := sysParams["Debug"]; ok { data, _ := httputil.DumpRequest(r, false) log.Println(string(data)) } // Discover apps from their config specs located on the filesystem. // TODO: look into caching this, large number of apps and http requests can slow down the broker. registeredApps, err := broker.NewRegisteredAppManifestFromJSON(broker.RegisteredAppsManifestJSONFile, broker.AppTypeAll) if err != nil { log.Printf("failed to parse registered app manifest: %v", err) writeResponse(w, http.StatusInternalServerError, "internal server error") return } // Extract app name from path reqApp := strings.Split(r.URL.Path, "/")[1] create := false shutdown := false getStatus := false if r.URL.Path == "/" { // Get user from header. At this time the per-app cookie has not been set and is not required. user := broker.GetUserFromCookieOrAuthHeader(r, "", authHeaderName) if len(user) == 0 { writeResponse(w, http.StatusBadRequest, fmt.Sprintf("Failed to get user from auth header")) return } // IAP uses a prefix of accounts.google.com:email, remove this to just get the email userToks := strings.Split(user, ":") user = userToks[len(userToks)-1] username := broker.GetUsernameFromHeaderOrDefault(r, usernameHeader, user) // Return list of apps appList := broker.AppListResponse{ BrokerName: brokerName, BrokerTheme: brokerTheme, BrokerRegion: brokerRegion, Apps: make([]broker.AppDataResponse, 0), User: user, } for _, app := range registeredApps.Apps { if app.UserParams == nil { // default user params to empty list. app.UserParams = make([]broker.AppConfigParam, 0) } if len(app.LaunchURL) == 0 { // default launch url to prefixed path. app.LaunchURL = fmt.Sprintf("/%s/", app.Name) } // App is editable if user is in the list of editors. editable := false for _, appEditor := range app.Editors { re, err := regexp.Compile(appEditor) if err != nil { log.Printf("failed to parse app editor as regexp: '%s'.", appEditor) continue } if re.MatchString(user) { editable = true break } } // Filter app by authorizedUsers if present. if len(app.AuthorizedUsers) > 0 { found := false for _, u := range app.AuthorizedUsers { re, err := regexp.Compile(u) if err != nil { log.Printf("failed to parse authorizedUser as regexp: '%s', skipping app.", u) break } if re.MatchString(user) || re.MatchString(username) { found = true break } } if !found { // Skip this app. continue } } appData := broker.AppDataResponse{ Name: app.Name, Type: app.Type, DisplayName: app.DisplayName, Description: app.Description, Icon: app.Icon, LaunchURL: app.LaunchURL, DefaultRepo: app.DefaultRepo, DefaultTag: app.DefaultTag, Params: app.UserParams, DefaultTier: app.DefaultTier, NodeTiers: app.NodeTierNames(), Editable: editable, DisableOptions: app.DisableOptions, } appList.Apps = append(appList.Apps, appData) } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(appList) return } // Get app spec from parsed apps. app, ok := registeredApps.Apps[reqApp] if !ok { log.Printf("app not found: %s", reqApp) writeResponse(w, http.StatusNotFound, "app not found") return } if app.Type != broker.AppTypeStatefulSet { w.Header().Set("Location", fmt.Sprintf("/reservation-broker/%s/", app.Name)) writeResponse(w, http.StatusFound, "app is reservation type") return } appName := app.Name cookieName := fmt.Sprintf("broker_%s", appName) switch r.Method { case "POST": create = true case "DELETE": shutdown = true case "GET": getStatus = true } // Get user from cookie or header user := broker.GetUserFromCookieOrAuthHeader(r, cookieName, authHeaderName) if len(user) == 0 { writeResponse(w, http.StatusBadRequest, fmt.Sprintf("Failed to get user from cookie or auth header")) return } // IAP uses a prefix of accounts.google.com:email, remove this to just get the email userToks := strings.Split(user, ":") user = userToks[len(userToks)-1] username := broker.GetUsernameFromHeaderOrDefault(r, usernameHeader, user) // App is editable if user is in the list of editors. editable := false for _, appEditor := range app.Editors { re, err := regexp.Compile(appEditor) if err != nil { log.Printf("failed to parse app editor as regexp: '%s'.", appEditor) continue } if re.MatchString(user) || re.MatchString(username) { editable = true break } } // Check per-app user authorization if present. if len(app.AuthorizedUsers) > 0 { found := false for _, u := range app.AuthorizedUsers { re, err := regexp.Compile(u) if err != nil { log.Printf("failed to parse authorizedUser as regexp: '%s', skipping app.", u) break } if re.MatchString(user) || re.MatchString(username) { found = true break } } if !found { writeResponse(w, http.StatusUnauthorized, fmt.Sprintf("user is not authorized")) return } } // Compute pod ID from user and app, must conform to DNS-1035. id := broker.MakePodID(user) namespace := fmt.Sprintf("user-%s", id) fullName := fmt.Sprintf("%s-%s", appName, id) destDir := path.Join(broker.BuildSourceBaseDir, user, appName) cookieValue := broker.MakeCookieValue(user, cookieSecret) userConfigFile := path.Join(broker.AppUserConfigBaseDir, appName, user, broker.AppUserConfigJSONFile) ts := fmt.Sprintf("%d", time.Now().Unix()) // Fetch user config, only use user options if spec.disableOptions is false. userConfig, err := broker.GetAppUserConfig(userConfigFile) if app.DisableOptions || err != nil { // config does not exist yet, generate default. defaultAppParams := make(map[string]string, 0) for _, param := range app.UserParams { defaultAppParams[param.Name] = param.Default } // Create new user config field with default spec userConfig = broker.NewAppUserConfig(fullName, namespace, broker.AppUserConfigSpec{ AppName: appName, User: user, ImageRepo: app.DefaultRepo, ImageTag: app.DefaultTag, Tags: []string{app.DefaultTag}, NodeTier: app.DefaultTier, Params: defaultAppParams, }) } userNSData := &broker.UserPodData{ Namespace: namespace, ProjectID: projectID, AppSpec: app, User: user, Timestamp: ts, } srcDirUser := path.Join(broker.UserBundleSourceBaseDir, appName) destDirUser := path.Join(broker.BuildSourceBaseDirNS, user) // Handle requests for per-app metadata requests if regexp.MustCompile(fmt.Sprintf(".*%s/metadata/?$", appName)).MatchString(r.URL.Path) { // Fetch pod status status, err := broker.GetPodStatus(namespace, fmt.Sprintf("app.kubernetes.io/instance=%s,app=%s", fullName, app.ServiceName)) if err != nil { log.Printf("failed to get pod status: %v", err) writeResponse(w, http.StatusInternalServerError, "internal server error") return } ip := "" sessionKey := "" if len(status.PodIPs) > 0 { ip = status.PodIPs[0] } if len(status.SessionKeys) > 0 { sessionKey = status.SessionKeys[0] } metadata := broker.ReservationMetadataSpec{ IP: ip, SessionKey: sessionKey, User: user, } w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(metadata) return } // Handle requests for per-app user configs if regexp.MustCompile(fmt.Sprintf(".*%s/config/?$", appName)).MatchString(r.URL.Path) { if getStatus { statusCode := http.StatusOK w.Header().Set("Content-Type", "application/json") w.WriteHeader(statusCode) json.NewEncoder(w).Encode(userConfig.Spec) return } else if create { // Read JSON body if r.Header.Get("content-type") != "application/json" { writeResponse(w, http.StatusBadRequest, "invalid content-type") return } // Verify config is allowed to be set per broker app config. if app.DisableOptions { writeResponse(w, http.StatusBadRequest, "user config cannot be modified at this time") return } var inputConfigSpec broker.AppUserConfigSpec err := json.NewDecoder(r.Body).Decode(&inputConfigSpec) if err != nil { writeResponse(w, http.StatusBadRequest, "invalid app user config") return } // Overwrite immutable fields. inputConfigSpec.AppName = appName inputConfigSpec.User = user inputConfigSpec.Tags = userConfig.Spec.Tags if len(inputConfigSpec.ImageRepo) == 0 { writeResponse(w, http.StatusBadRequest, "missing config field: imageRepo") return } if len(inputConfigSpec.ImageTag) == 0 { writeResponse(w, http.StatusBadRequest, "missing config field: imageTag") return } if len(inputConfigSpec.NodeTier) == 0 { writeResponse(w, http.StatusBadRequest, "missing config field: nodeTier") return } foundTier := false for _, tier := range app.NodeTiers { if inputConfigSpec.NodeTier == tier.Name { foundTier = true } } if !foundTier { writeResponse(w, http.StatusBadRequest, fmt.Sprintf("invalid node tier: %s", inputConfigSpec.NodeTier)) return } // Verifiy image repo and tag exists if it was changed. if inputConfigSpec.ImageRepo != userConfig.Spec.ImageRepo || inputConfigSpec.ImageTag != userConfig.Spec.ImageTag { log.Printf("validating user image repo: %s:%s", inputConfigSpec.ImageRepo, inputConfigSpec.ImageTag) if err := broker.ValidateImageRepo(inputConfigSpec.ImageRepo, inputConfigSpec.ImageTag, allowedRepoPattern); err != nil { log.Printf("user %s config image validation failed: %v", user, err) writeResponse(w, http.StatusBadRequest, fmt.Sprintf("%v", err)) return } } // Verify parameters are valid for this app. for paramName := range inputConfigSpec.Params { found := false for _, supportedParamName := range app.UserParams { if paramName == supportedParamName.Name { found = true break } } if !found { msg := fmt.Sprintf("user %s config image validation failed: invalid parameter '%s", user, paramName) log.Printf(msg) writeResponse(w, http.StatusBadRequest, msg) return } } // Set user config spec to validated input spec. userConfig.Spec = inputConfigSpec // Save config to local file. if err := userConfig.WriteJSON(userConfigFile); err != nil { log.Printf("failed to save copy of user config: %v", err) writeResponse(w, http.StatusInternalServerError, "internal server error") return } // Build user namespace template. if err := broker.BuildDeploy(broker.BrokerCommonBuildSourceBaseDirStatefulSetUser, srcDirUser, destDirUser, userNSData); err != nil { log.Printf("%v", err) writeResponse(w, http.StatusInternalServerError, "internal server error") return } // Apply config to cluster cmd := exec.Command("sh", "-o", "pipefail", "-c", fmt.Sprintf("kustomize build %s | kubectl apply -f - && kubectl apply -f %s", destDirUser, userConfigFile)) cmd.Dir = path.Dir(destDir) stdoutStderr, err := cmd.CombinedOutput() if err != nil { log.Printf("error calling kubectl to apply user config for %s: %v\n%s", user, err, stdoutStderr) writeResponse(w, http.StatusInternalServerError, "internal server error") return } writeResponse(w, http.StatusOK, "user config updated") } else { writeResponse(w, http.StatusBadRequest, "only POST method is supported.") return } return } // Extract query parameters // Note that only the first instance of a repeated query param is used. queryParams := make(map[string]string, len(r.URL.Query())) for k, v := range r.URL.Query() { queryParams[k] = v[0] } // Map named tier to NodeTierSpec for pod template. // Spec contains node affinity labels and resources used in pod templates. var nodeTierSpec broker.NodeTierSpec found := false for _, tier := range app.NodeTiers { if tier.Name == userConfig.Spec.NodeTier { nodeTierSpec = tier found = true break } } if !found { log.Printf("failed to map config tier '%s' to NodeTierSpec", userConfig.Spec.NodeTier) writeResponse(w, http.StatusInternalServerError, "internal server error") return } // Build map of app params from app config spec appParams := make(map[string]string, 0) for _, param := range app.AppParams { appParams[param.Name] = param.Default } // Generate session key. Add to AppParams if _, ok := appParams["sessionKey"]; !ok { appParams["sessionKey"] = broker.MakeSessionKey() } data := &broker.UserPodData{ Namespace: namespace, ProjectID: projectID, ClientID: clientID, AppSpec: app, AppUserConfig: userConfig.Spec, App: appName, ImageRepo: userConfig.Spec.ImageRepo, ImageTag: userConfig.Spec.ImageTag, NodeTier: nodeTierSpec, Domain: domain, User: user, Username: username, CookieValue: cookieValue, ID: id, FullName: fullName, ServiceName: app.ServiceName, Resources: []string{}, Patches: []string{}, JSONPatchesService: []string{}, JSONPatchesVirtualService: []string{}, JSONPatchesDeploy: []string{}, UserParams: userConfig.Spec.Params, AppParams: appParams, SysParams: sysParams, NetworkPolicyData: registeredApps.NetworkPolicyData, Timestamp: ts, Region: brokerRegion, Editable: editable, } appPath := fmt.Sprintf("/%s/", appName) // Build user application bundle. srcDirApp := path.Join(broker.BundleSourceBaseDir, app.Name) if err := broker.BuildDeploy(broker.BrokerCommonBuildSourceBaseDirStatefulSetApp, srcDirApp, destDir, data); err != nil { log.Printf("%v", err) writeResponse(w, http.StatusInternalServerError, "internal server error") return } // Build patch that adds list of applied objects to the statefulset. if err := broker.GenerateObjectTypePatch(destDir); err != nil { log.Printf("%v", err) writeResponse(w, http.StatusInternalServerError, "internal server error") return } // Build user namespace template. if err := broker.BuildDeploy(broker.BrokerCommonBuildSourceBaseDirStatefulSetUser, srcDirUser, destDirUser, userNSData); err != nil { log.Printf("%v", err) writeResponse(w, http.StatusInternalServerError, "internal server error") return } if shutdown { if _, err := os.Stat(destDir); os.IsNotExist(err) { writeResponse(w, http.StatusBadRequest, "shutdown") return } for i, hook := range app.ShutdownHooks { selector := strings.Join([]string{"app.kubernetes.io/instance=" + fullName, hook.Selector}, ",") log.Printf("executing shutdown hook %d/%d for %s, selector=%s, container=%s, command=%s", i+1, len(app.ShutdownHooks), fullName, selector, hook.Container, hook.Command) if err := broker.ExecPodCommand(namespace, selector, hook.Container, hook.Command); err != nil { log.Printf("error calling shutdown hook: %v", err) } } // Fetch pod status to retrieve the list of object types. status, err := broker.GetPodStatus(namespace, fmt.Sprintf("app.kubernetes.io/instance=%s,app=%s", fullName, app.ServiceName)) if err != nil { log.Printf("failed to get pod status: %v", err) writeResponse(w, http.StatusInternalServerError, "internal server error") return } if len(status.BrokerObjects) > 0 { log.Printf("shutting down %s pod for user: %s", appName, user) objectTypes := strings.Join(status.BrokerObjects, ",") cmd := exec.Command("sh", "-o", "pipefail", "-c", fmt.Sprintf("kubectl delete %s -n %s -l \"app.kubernetes.io/instance=%s\" --wait=false", objectTypes, namespace, fullName)) cmd.Dir = destDir stdoutStderr, err := cmd.CombinedOutput() if err != nil { log.Printf("error calling kubectl for %s: %v\n%s", user, err, stdoutStderr) writeResponse(w, http.StatusInternalServerError, "internal server error") return } } // Delete the cookie by setting max-age to -1 broker.SetCookie(w, cookieName, cookieValue, appPath, -1) writeResponse(w, http.StatusAccepted, "terminating") return } if getStatus { // Get pod status based on conditions. status, err := broker.GetPodStatus(namespace, fmt.Sprintf("app.kubernetes.io/instance=%s,app=%s", fullName, app.ServiceName)) if err != nil { log.Printf("failed to get pod ips: %v", err) writeResponse(w, http.StatusInternalServerError, "internal server error") return } statusCode := http.StatusOK if status.Status == "waiting" { statusCode = http.StatusCreated } if status.Status == "ready" { broker.SetCookie(w, cookieName, cookieValue, appPath, maxCookieAgeSeconds) } if redirectURL, ok := queryParams["r"]; ok { // Add header to redirect. w.Header().Set("Location", redirectURL) statusCode = http.StatusTemporaryRedirect } w.Header().Set("Content-Type", "application/json") w.WriteHeader(statusCode) json.NewEncoder(w).Encode(status) return } if create { log.Printf("creating pod for user: %s: %s", user, fullName) cmd := exec.Command("sh", "-o", "pipefail", "-c", fmt.Sprintf("kustomize build %s | kubectl apply -f - && kustomize build %s | kubectl apply -f -", destDirUser, destDir)) cmd.Dir = destDir stdoutStderr, err := cmd.CombinedOutput() if err != nil { log.Printf("error calling kubectl for %s: %v\n%s", user, err, stdoutStderr) writeResponse(w, http.StatusInternalServerError, "internal server error") return } broker.SetCookie(w, cookieName, cookieValue, appPath, maxCookieAgeSeconds) writeResponse(w, http.StatusAccepted, "created") log.Printf("pod created for user: %s: %s", user, fullName) } }) log.Println("Listening on port 8080") log.Fatal(http.ListenAndServe(":8080", nil)) } func writeResponse(w http.ResponseWriter, statusCode int, message string) { status := broker.StatusResponse{ Code: statusCode, Status: message, } w.Header().Set("Content-Type", "application/json") w.WriteHeader(statusCode) json.NewEncoder(w).Encode(status) } func writeResponseWithIPs(w http.ResponseWriter, statusCode int, message string, ips []string) { status := broker.StatusResponse{ Status: message, PodIPs: ips, } w.Header().Set("Content-Type", "application/json") w.WriteHeader(statusCode) json.NewEncoder(w).Encode(status) }