func main()

in images/controller/cmd/pod_broker/pod_broker.go [41:673]


func main() {
	cookieSecret := os.Getenv("COOKIE_SECRET")
	if len(cookieSecret) == 0 {
		// Generate random secret
		log.Printf("no COOKIE_SECRET env var found, generating random secret value")
		h := sha1.New()
		io.WriteString(h, fmt.Sprintf("%d.%d", rand.Intn(10000), int32(time.Now().Unix())))
		cookieSecret = fmt.Sprintf("%x", h.Sum(nil))
	}

	clientID := os.Getenv("OAUTH_CLIENT_ID")
	if len(clientID) == 0 {
		log.Fatalf("missing env, OAUTH_CLIENT_ID")
	}

	// Values available to templates from environment variables prefixed with POD_BROKER_PARAM_Name=Value
	// Map of Name=Value
	sysParams := broker.GetEnvPrefixedVars("POD_BROKER_PARAM_")

	// Project ID from instance metadata
	projectID, err := broker.GetProjectID()
	if err != nil {
		log.Fatalf("failed to determine project ID: %v", err)
	}

	// Region from instance metadata
	brokerRegion, err := broker.GetInstanceRegion()
	if err != nil {
		log.Fatalf("failed to determine broker region: %v", err)
	}

	// Title from params
	brokerName, ok := sysParams["Title"]
	if !ok {
		brokerName = "App Launcher"
		log.Printf("using default broker title: %s", brokerName)
	}

	// Theme from params
	brokerTheme, ok := sysParams["Theme"]
	if !ok {
		brokerTheme = "light"
		log.Printf("using default broker theme: %s", brokerTheme)
	}

	// Domain from params
	domain, ok := sysParams["Domain"]
	if !ok {
		log.Fatal("Missing POD_BROKER_PARAM_Domain env.")
	}

	// AuthHeader from params
	authHeaderName, ok := sysParams["AuthHeader"]
	if !ok {
		log.Fatal("Missing POD_BROKER_PARAM_AuthHeader env.")
	}

	usernameHeader, _ := sysParams["UsernameHeader"]

	// Authorized user image repo pattern regexp.
	allowedRepoPatternParam, ok := sysParams["AuthorizedUserRepoPattern"]
	if !ok {
		log.Fatal("Missing POD_BROKER_PARAM_AuthorizedUserRepoPattern env.")
	}
	allowedRepoPattern := regexp.MustCompile(allowedRepoPatternParam)

	http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
		if _, ok := sysParams["Debug"]; ok {
			data, _ := httputil.DumpRequest(r, false)
			log.Println(string(data))
		}

		// Discover apps from their config specs located on the filesystem.
		// TODO: look into caching this, large number of apps and http requests can slow down the broker.
		registeredApps, err := broker.NewRegisteredAppManifestFromJSON(broker.RegisteredAppsManifestJSONFile, broker.AppTypeAll)
		if err != nil {
			log.Printf("failed to parse registered app manifest: %v", err)
			writeResponse(w, http.StatusInternalServerError, "internal server error")
			return
		}

		// Extract app name from path
		reqApp := strings.Split(r.URL.Path, "/")[1]

		create := false
		shutdown := false
		getStatus := false

		if r.URL.Path == "/" {
			// Get user from header. At this time the per-app cookie has not been set and is not required.
			user := broker.GetUserFromCookieOrAuthHeader(r, "", authHeaderName)
			if len(user) == 0 {
				writeResponse(w, http.StatusBadRequest, fmt.Sprintf("Failed to get user from auth header"))
				return
			}
			// IAP uses a prefix of accounts.google.com:email, remove this to just get the email
			userToks := strings.Split(user, ":")
			user = userToks[len(userToks)-1]

			username := broker.GetUsernameFromHeaderOrDefault(r, usernameHeader, user)

			// Return list of apps
			appList := broker.AppListResponse{
				BrokerName:   brokerName,
				BrokerTheme:  brokerTheme,
				BrokerRegion: brokerRegion,
				Apps:         make([]broker.AppDataResponse, 0),
				User:         user,
			}

			for _, app := range registeredApps.Apps {
				if app.UserParams == nil {
					// default user params to empty list.
					app.UserParams = make([]broker.AppConfigParam, 0)
				}

				if len(app.LaunchURL) == 0 {
					// default launch url to prefixed path.
					app.LaunchURL = fmt.Sprintf("/%s/", app.Name)
				}

				// App is editable if user is in the list of editors.
				editable := false
				for _, appEditor := range app.Editors {
					re, err := regexp.Compile(appEditor)
					if err != nil {
						log.Printf("failed to parse app editor as regexp: '%s'.", appEditor)
						continue
					}
					if re.MatchString(user) {
						editable = true
						break
					}
				}

				// Filter app by authorizedUsers if present.
				if len(app.AuthorizedUsers) > 0 {
					found := false
					for _, u := range app.AuthorizedUsers {
						re, err := regexp.Compile(u)
						if err != nil {
							log.Printf("failed to parse authorizedUser as regexp: '%s', skipping app.", u)
							break
						}
						if re.MatchString(user) || re.MatchString(username) {
							found = true
							break
						}
					}
					if !found {
						// Skip this app.
						continue
					}
				}

				appData := broker.AppDataResponse{
					Name:           app.Name,
					Type:           app.Type,
					DisplayName:    app.DisplayName,
					Description:    app.Description,
					Icon:           app.Icon,
					LaunchURL:      app.LaunchURL,
					DefaultRepo:    app.DefaultRepo,
					DefaultTag:     app.DefaultTag,
					Params:         app.UserParams,
					DefaultTier:    app.DefaultTier,
					NodeTiers:      app.NodeTierNames(),
					Editable:       editable,
					DisableOptions: app.DisableOptions,
				}
				appList.Apps = append(appList.Apps, appData)
			}

			w.Header().Set("Content-Type", "application/json")
			json.NewEncoder(w).Encode(appList)
			return
		}

		// Get app spec from parsed apps.
		app, ok := registeredApps.Apps[reqApp]
		if !ok {
			log.Printf("app not found: %s", reqApp)
			writeResponse(w, http.StatusNotFound, "app not found")
			return
		}

		if app.Type != broker.AppTypeStatefulSet {
			w.Header().Set("Location", fmt.Sprintf("/reservation-broker/%s/", app.Name))
			writeResponse(w, http.StatusFound, "app is reservation type")
			return
		}

		appName := app.Name

		cookieName := fmt.Sprintf("broker_%s", appName)

		switch r.Method {
		case "POST":
			create = true
		case "DELETE":
			shutdown = true
		case "GET":
			getStatus = true
		}

		// Get user from cookie or header
		user := broker.GetUserFromCookieOrAuthHeader(r, cookieName, authHeaderName)
		if len(user) == 0 {
			writeResponse(w, http.StatusBadRequest, fmt.Sprintf("Failed to get user from cookie or auth header"))
			return
		}
		// IAP uses a prefix of accounts.google.com:email, remove this to just get the email
		userToks := strings.Split(user, ":")
		user = userToks[len(userToks)-1]

		username := broker.GetUsernameFromHeaderOrDefault(r, usernameHeader, user)

		// App is editable if user is in the list of editors.
		editable := false
		for _, appEditor := range app.Editors {
			re, err := regexp.Compile(appEditor)
			if err != nil {
				log.Printf("failed to parse app editor as regexp: '%s'.", appEditor)
				continue
			}
			if re.MatchString(user) || re.MatchString(username) {
				editable = true
				break
			}
		}

		// Check per-app user authorization if present.
		if len(app.AuthorizedUsers) > 0 {
			found := false
			for _, u := range app.AuthorizedUsers {
				re, err := regexp.Compile(u)
				if err != nil {
					log.Printf("failed to parse authorizedUser as regexp: '%s', skipping app.", u)
					break
				}
				if re.MatchString(user) || re.MatchString(username) {
					found = true
					break
				}
			}
			if !found {
				writeResponse(w, http.StatusUnauthorized, fmt.Sprintf("user is not authorized"))
				return
			}
		}

		// Compute pod ID from user and app, must conform to DNS-1035.
		id := broker.MakePodID(user)

		namespace := fmt.Sprintf("user-%s", id)

		fullName := fmt.Sprintf("%s-%s", appName, id)

		destDir := path.Join(broker.BuildSourceBaseDir, user, appName)

		cookieValue := broker.MakeCookieValue(user, cookieSecret)

		userConfigFile := path.Join(broker.AppUserConfigBaseDir, appName, user, broker.AppUserConfigJSONFile)

		ts := fmt.Sprintf("%d", time.Now().Unix())

		// Fetch user config, only use user options if spec.disableOptions is false.
		userConfig, err := broker.GetAppUserConfig(userConfigFile)
		if app.DisableOptions || err != nil {
			// config does not exist yet, generate default.
			defaultAppParams := make(map[string]string, 0)
			for _, param := range app.UserParams {
				defaultAppParams[param.Name] = param.Default
			}

			// Create new user config field with default spec
			userConfig = broker.NewAppUserConfig(fullName, namespace, broker.AppUserConfigSpec{
				AppName:   appName,
				User:      user,
				ImageRepo: app.DefaultRepo,
				ImageTag:  app.DefaultTag,
				Tags:      []string{app.DefaultTag},
				NodeTier:  app.DefaultTier,
				Params:    defaultAppParams,
			})
		}

		userNSData := &broker.UserPodData{
			Namespace: namespace,
			ProjectID: projectID,
			AppSpec:   app,
			User:      user,
			Timestamp: ts,
		}
		srcDirUser := path.Join(broker.UserBundleSourceBaseDir, appName)
		destDirUser := path.Join(broker.BuildSourceBaseDirNS, user)

		// Handle requests for per-app metadata requests
		if regexp.MustCompile(fmt.Sprintf(".*%s/metadata/?$", appName)).MatchString(r.URL.Path) {
			// Fetch pod status
			status, err := broker.GetPodStatus(namespace, fmt.Sprintf("app.kubernetes.io/instance=%s,app=%s", fullName, app.ServiceName))
			if err != nil {
				log.Printf("failed to get pod status: %v", err)
				writeResponse(w, http.StatusInternalServerError, "internal server error")
				return
			}

			ip := ""
			sessionKey := ""
			if len(status.PodIPs) > 0 {
				ip = status.PodIPs[0]
			}
			if len(status.SessionKeys) > 0 {
				sessionKey = status.SessionKeys[0]
			}
			metadata := broker.ReservationMetadataSpec{
				IP:         ip,
				SessionKey: sessionKey,
				User:       user,
			}
			w.Header().Set("Content-Type", "application/json")
			w.WriteHeader(http.StatusOK)
			json.NewEncoder(w).Encode(metadata)
			return
		}

		// Handle requests for per-app user configs
		if regexp.MustCompile(fmt.Sprintf(".*%s/config/?$", appName)).MatchString(r.URL.Path) {
			if getStatus {
				statusCode := http.StatusOK
				w.Header().Set("Content-Type", "application/json")
				w.WriteHeader(statusCode)
				json.NewEncoder(w).Encode(userConfig.Spec)
				return

			} else if create {
				// Read JSON body
				if r.Header.Get("content-type") != "application/json" {
					writeResponse(w, http.StatusBadRequest, "invalid content-type")
					return
				}

				// Verify config is allowed to be set per broker app config.
				if app.DisableOptions {
					writeResponse(w, http.StatusBadRequest, "user config cannot be modified at this time")
					return
				}

				var inputConfigSpec broker.AppUserConfigSpec
				err := json.NewDecoder(r.Body).Decode(&inputConfigSpec)
				if err != nil {
					writeResponse(w, http.StatusBadRequest, "invalid app user config")
					return
				}

				// Overwrite immutable fields.
				inputConfigSpec.AppName = appName
				inputConfigSpec.User = user
				inputConfigSpec.Tags = userConfig.Spec.Tags

				if len(inputConfigSpec.ImageRepo) == 0 {
					writeResponse(w, http.StatusBadRequest, "missing config field: imageRepo")
					return
				}

				if len(inputConfigSpec.ImageTag) == 0 {
					writeResponse(w, http.StatusBadRequest, "missing config field: imageTag")
					return
				}

				if len(inputConfigSpec.NodeTier) == 0 {
					writeResponse(w, http.StatusBadRequest, "missing config field: nodeTier")
					return
				}
				foundTier := false
				for _, tier := range app.NodeTiers {
					if inputConfigSpec.NodeTier == tier.Name {
						foundTier = true
					}
				}
				if !foundTier {
					writeResponse(w, http.StatusBadRequest, fmt.Sprintf("invalid node tier: %s", inputConfigSpec.NodeTier))
					return
				}

				// Verifiy image repo and tag exists if it was changed.
				if inputConfigSpec.ImageRepo != userConfig.Spec.ImageRepo || inputConfigSpec.ImageTag != userConfig.Spec.ImageTag {
					log.Printf("validating user image repo: %s:%s", inputConfigSpec.ImageRepo, inputConfigSpec.ImageTag)
					if err := broker.ValidateImageRepo(inputConfigSpec.ImageRepo, inputConfigSpec.ImageTag, allowedRepoPattern); err != nil {
						log.Printf("user %s config image validation failed: %v", user, err)
						writeResponse(w, http.StatusBadRequest, fmt.Sprintf("%v", err))
						return
					}
				}

				// Verify parameters are valid for this app.
				for paramName := range inputConfigSpec.Params {
					found := false
					for _, supportedParamName := range app.UserParams {
						if paramName == supportedParamName.Name {
							found = true
							break
						}
					}
					if !found {
						msg := fmt.Sprintf("user %s config image validation failed: invalid parameter '%s", user, paramName)
						log.Printf(msg)
						writeResponse(w, http.StatusBadRequest, msg)
						return
					}
				}

				// Set user config spec to validated input spec.
				userConfig.Spec = inputConfigSpec

				// Save config to local file.
				if err := userConfig.WriteJSON(userConfigFile); err != nil {
					log.Printf("failed to save copy of user config: %v", err)
					writeResponse(w, http.StatusInternalServerError, "internal server error")
					return
				}

				// Build user namespace template.
				if err := broker.BuildDeploy(broker.BrokerCommonBuildSourceBaseDirStatefulSetUser, srcDirUser, destDirUser, userNSData); err != nil {
					log.Printf("%v", err)
					writeResponse(w, http.StatusInternalServerError, "internal server error")
					return
				}

				// Apply config to cluster
				cmd := exec.Command("sh", "-o", "pipefail", "-c", fmt.Sprintf("kustomize build %s | kubectl apply -f - && kubectl apply -f %s", destDirUser, userConfigFile))
				cmd.Dir = path.Dir(destDir)
				stdoutStderr, err := cmd.CombinedOutput()
				if err != nil {
					log.Printf("error calling kubectl to apply user config for %s: %v\n%s", user, err, stdoutStderr)
					writeResponse(w, http.StatusInternalServerError, "internal server error")
					return
				}

				writeResponse(w, http.StatusOK, "user config updated")

			} else {
				writeResponse(w, http.StatusBadRequest, "only POST method is supported.")
				return
			}
			return
		}

		// Extract query parameters
		// Note that only the first instance of a repeated query param is used.
		queryParams := make(map[string]string, len(r.URL.Query()))
		for k, v := range r.URL.Query() {
			queryParams[k] = v[0]
		}

		// Map named tier to NodeTierSpec for pod template.
		// Spec contains node affinity labels and resources used in pod templates.
		var nodeTierSpec broker.NodeTierSpec
		found := false
		for _, tier := range app.NodeTiers {
			if tier.Name == userConfig.Spec.NodeTier {
				nodeTierSpec = tier
				found = true
				break
			}
		}
		if !found {
			log.Printf("failed to map config tier '%s' to NodeTierSpec", userConfig.Spec.NodeTier)
			writeResponse(w, http.StatusInternalServerError, "internal server error")
			return
		}

		// Build map of app params from app config spec
		appParams := make(map[string]string, 0)
		for _, param := range app.AppParams {
			appParams[param.Name] = param.Default
		}

		// Generate session key. Add to AppParams
		if _, ok := appParams["sessionKey"]; !ok {
			appParams["sessionKey"] = broker.MakeSessionKey()
		}

		data := &broker.UserPodData{
			Namespace:                 namespace,
			ProjectID:                 projectID,
			ClientID:                  clientID,
			AppSpec:                   app,
			AppUserConfig:             userConfig.Spec,
			App:                       appName,
			ImageRepo:                 userConfig.Spec.ImageRepo,
			ImageTag:                  userConfig.Spec.ImageTag,
			NodeTier:                  nodeTierSpec,
			Domain:                    domain,
			User:                      user,
			Username:                  username,
			CookieValue:               cookieValue,
			ID:                        id,
			FullName:                  fullName,
			ServiceName:               app.ServiceName,
			Resources:                 []string{},
			Patches:                   []string{},
			JSONPatchesService:        []string{},
			JSONPatchesVirtualService: []string{},
			JSONPatchesDeploy:         []string{},
			UserParams:                userConfig.Spec.Params,
			AppParams:                 appParams,
			SysParams:                 sysParams,
			NetworkPolicyData:         registeredApps.NetworkPolicyData,
			Timestamp:                 ts,
			Region:                    brokerRegion,
			Editable:                  editable,
		}

		appPath := fmt.Sprintf("/%s/", appName)

		// Build user application bundle.
		srcDirApp := path.Join(broker.BundleSourceBaseDir, app.Name)
		if err := broker.BuildDeploy(broker.BrokerCommonBuildSourceBaseDirStatefulSetApp, srcDirApp, destDir, data); err != nil {
			log.Printf("%v", err)
			writeResponse(w, http.StatusInternalServerError, "internal server error")
			return
		}

		// Build patch that adds list of applied objects to the statefulset.
		if err := broker.GenerateObjectTypePatch(destDir); err != nil {
			log.Printf("%v", err)
			writeResponse(w, http.StatusInternalServerError, "internal server error")
			return
		}

		// Build user namespace template.
		if err := broker.BuildDeploy(broker.BrokerCommonBuildSourceBaseDirStatefulSetUser, srcDirUser, destDirUser, userNSData); err != nil {
			log.Printf("%v", err)
			writeResponse(w, http.StatusInternalServerError, "internal server error")
			return
		}

		if shutdown {
			if _, err := os.Stat(destDir); os.IsNotExist(err) {
				writeResponse(w, http.StatusBadRequest, "shutdown")
				return
			}

			for i, hook := range app.ShutdownHooks {
				selector := strings.Join([]string{"app.kubernetes.io/instance=" + fullName, hook.Selector}, ",")
				log.Printf("executing shutdown hook %d/%d for %s, selector=%s, container=%s, command=%s", i+1, len(app.ShutdownHooks), fullName, selector, hook.Container, hook.Command)
				if err := broker.ExecPodCommand(namespace, selector, hook.Container, hook.Command); err != nil {
					log.Printf("error calling shutdown hook: %v", err)
				}
			}

			// Fetch pod status to retrieve the list of object types.
			status, err := broker.GetPodStatus(namespace, fmt.Sprintf("app.kubernetes.io/instance=%s,app=%s", fullName, app.ServiceName))
			if err != nil {
				log.Printf("failed to get pod status: %v", err)
				writeResponse(w, http.StatusInternalServerError, "internal server error")
				return
			}

			if len(status.BrokerObjects) > 0 {
				log.Printf("shutting down %s pod for user: %s", appName, user)
				objectTypes := strings.Join(status.BrokerObjects, ",")
				cmd := exec.Command("sh", "-o", "pipefail", "-c", fmt.Sprintf("kubectl delete %s -n %s -l \"app.kubernetes.io/instance=%s\" --wait=false", objectTypes, namespace, fullName))
				cmd.Dir = destDir
				stdoutStderr, err := cmd.CombinedOutput()
				if err != nil {
					log.Printf("error calling kubectl for %s: %v\n%s", user, err, stdoutStderr)
					writeResponse(w, http.StatusInternalServerError, "internal server error")
					return
				}
			}

			// Delete the cookie by setting max-age to -1
			broker.SetCookie(w, cookieName, cookieValue, appPath, -1)

			writeResponse(w, http.StatusAccepted, "terminating")
			return
		}

		if getStatus {
			// Get pod status based on conditions.
			status, err := broker.GetPodStatus(namespace, fmt.Sprintf("app.kubernetes.io/instance=%s,app=%s", fullName, app.ServiceName))
			if err != nil {
				log.Printf("failed to get pod ips: %v", err)
				writeResponse(w, http.StatusInternalServerError, "internal server error")
				return
			}

			statusCode := http.StatusOK

			if status.Status == "waiting" {
				statusCode = http.StatusCreated
			}

			if status.Status == "ready" {
				broker.SetCookie(w, cookieName, cookieValue, appPath, maxCookieAgeSeconds)
			}

			if redirectURL, ok := queryParams["r"]; ok {
				// Add header to redirect.
				w.Header().Set("Location", redirectURL)
				statusCode = http.StatusTemporaryRedirect
			}

			w.Header().Set("Content-Type", "application/json")
			w.WriteHeader(statusCode)
			json.NewEncoder(w).Encode(status)

			return
		}

		if create {
			log.Printf("creating pod for user: %s: %s", user, fullName)
			cmd := exec.Command("sh", "-o", "pipefail", "-c", fmt.Sprintf("kustomize build %s | kubectl apply -f - && kustomize build %s | kubectl apply -f -", destDirUser, destDir))
			cmd.Dir = destDir
			stdoutStderr, err := cmd.CombinedOutput()
			if err != nil {
				log.Printf("error calling kubectl for %s: %v\n%s", user, err, stdoutStderr)
				writeResponse(w, http.StatusInternalServerError, "internal server error")
				return
			}

			broker.SetCookie(w, cookieName, cookieValue, appPath, maxCookieAgeSeconds)

			writeResponse(w, http.StatusAccepted, "created")
			log.Printf("pod created for user: %s: %s", user, fullName)
		}
	})

	log.Println("Listening on port 8080")
	log.Fatal(http.ListenAndServe(":8080", nil))
}