in images/controller/cmd/pod_broker/pod_broker.go [41:673]
func main() {
cookieSecret := os.Getenv("COOKIE_SECRET")
if len(cookieSecret) == 0 {
// Generate random secret
log.Printf("no COOKIE_SECRET env var found, generating random secret value")
h := sha1.New()
io.WriteString(h, fmt.Sprintf("%d.%d", rand.Intn(10000), int32(time.Now().Unix())))
cookieSecret = fmt.Sprintf("%x", h.Sum(nil))
}
clientID := os.Getenv("OAUTH_CLIENT_ID")
if len(clientID) == 0 {
log.Fatalf("missing env, OAUTH_CLIENT_ID")
}
// Values available to templates from environment variables prefixed with POD_BROKER_PARAM_Name=Value
// Map of Name=Value
sysParams := broker.GetEnvPrefixedVars("POD_BROKER_PARAM_")
// Project ID from instance metadata
projectID, err := broker.GetProjectID()
if err != nil {
log.Fatalf("failed to determine project ID: %v", err)
}
// Region from instance metadata
brokerRegion, err := broker.GetInstanceRegion()
if err != nil {
log.Fatalf("failed to determine broker region: %v", err)
}
// Title from params
brokerName, ok := sysParams["Title"]
if !ok {
brokerName = "App Launcher"
log.Printf("using default broker title: %s", brokerName)
}
// Theme from params
brokerTheme, ok := sysParams["Theme"]
if !ok {
brokerTheme = "light"
log.Printf("using default broker theme: %s", brokerTheme)
}
// Domain from params
domain, ok := sysParams["Domain"]
if !ok {
log.Fatal("Missing POD_BROKER_PARAM_Domain env.")
}
// AuthHeader from params
authHeaderName, ok := sysParams["AuthHeader"]
if !ok {
log.Fatal("Missing POD_BROKER_PARAM_AuthHeader env.")
}
usernameHeader, _ := sysParams["UsernameHeader"]
// Authorized user image repo pattern regexp.
allowedRepoPatternParam, ok := sysParams["AuthorizedUserRepoPattern"]
if !ok {
log.Fatal("Missing POD_BROKER_PARAM_AuthorizedUserRepoPattern env.")
}
allowedRepoPattern := regexp.MustCompile(allowedRepoPatternParam)
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
if _, ok := sysParams["Debug"]; ok {
data, _ := httputil.DumpRequest(r, false)
log.Println(string(data))
}
// Discover apps from their config specs located on the filesystem.
// TODO: look into caching this, large number of apps and http requests can slow down the broker.
registeredApps, err := broker.NewRegisteredAppManifestFromJSON(broker.RegisteredAppsManifestJSONFile, broker.AppTypeAll)
if err != nil {
log.Printf("failed to parse registered app manifest: %v", err)
writeResponse(w, http.StatusInternalServerError, "internal server error")
return
}
// Extract app name from path
reqApp := strings.Split(r.URL.Path, "/")[1]
create := false
shutdown := false
getStatus := false
if r.URL.Path == "/" {
// Get user from header. At this time the per-app cookie has not been set and is not required.
user := broker.GetUserFromCookieOrAuthHeader(r, "", authHeaderName)
if len(user) == 0 {
writeResponse(w, http.StatusBadRequest, fmt.Sprintf("Failed to get user from auth header"))
return
}
// IAP uses a prefix of accounts.google.com:email, remove this to just get the email
userToks := strings.Split(user, ":")
user = userToks[len(userToks)-1]
username := broker.GetUsernameFromHeaderOrDefault(r, usernameHeader, user)
// Return list of apps
appList := broker.AppListResponse{
BrokerName: brokerName,
BrokerTheme: brokerTheme,
BrokerRegion: brokerRegion,
Apps: make([]broker.AppDataResponse, 0),
User: user,
}
for _, app := range registeredApps.Apps {
if app.UserParams == nil {
// default user params to empty list.
app.UserParams = make([]broker.AppConfigParam, 0)
}
if len(app.LaunchURL) == 0 {
// default launch url to prefixed path.
app.LaunchURL = fmt.Sprintf("/%s/", app.Name)
}
// App is editable if user is in the list of editors.
editable := false
for _, appEditor := range app.Editors {
re, err := regexp.Compile(appEditor)
if err != nil {
log.Printf("failed to parse app editor as regexp: '%s'.", appEditor)
continue
}
if re.MatchString(user) {
editable = true
break
}
}
// Filter app by authorizedUsers if present.
if len(app.AuthorizedUsers) > 0 {
found := false
for _, u := range app.AuthorizedUsers {
re, err := regexp.Compile(u)
if err != nil {
log.Printf("failed to parse authorizedUser as regexp: '%s', skipping app.", u)
break
}
if re.MatchString(user) || re.MatchString(username) {
found = true
break
}
}
if !found {
// Skip this app.
continue
}
}
appData := broker.AppDataResponse{
Name: app.Name,
Type: app.Type,
DisplayName: app.DisplayName,
Description: app.Description,
Icon: app.Icon,
LaunchURL: app.LaunchURL,
DefaultRepo: app.DefaultRepo,
DefaultTag: app.DefaultTag,
Params: app.UserParams,
DefaultTier: app.DefaultTier,
NodeTiers: app.NodeTierNames(),
Editable: editable,
DisableOptions: app.DisableOptions,
}
appList.Apps = append(appList.Apps, appData)
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(appList)
return
}
// Get app spec from parsed apps.
app, ok := registeredApps.Apps[reqApp]
if !ok {
log.Printf("app not found: %s", reqApp)
writeResponse(w, http.StatusNotFound, "app not found")
return
}
if app.Type != broker.AppTypeStatefulSet {
w.Header().Set("Location", fmt.Sprintf("/reservation-broker/%s/", app.Name))
writeResponse(w, http.StatusFound, "app is reservation type")
return
}
appName := app.Name
cookieName := fmt.Sprintf("broker_%s", appName)
switch r.Method {
case "POST":
create = true
case "DELETE":
shutdown = true
case "GET":
getStatus = true
}
// Get user from cookie or header
user := broker.GetUserFromCookieOrAuthHeader(r, cookieName, authHeaderName)
if len(user) == 0 {
writeResponse(w, http.StatusBadRequest, fmt.Sprintf("Failed to get user from cookie or auth header"))
return
}
// IAP uses a prefix of accounts.google.com:email, remove this to just get the email
userToks := strings.Split(user, ":")
user = userToks[len(userToks)-1]
username := broker.GetUsernameFromHeaderOrDefault(r, usernameHeader, user)
// App is editable if user is in the list of editors.
editable := false
for _, appEditor := range app.Editors {
re, err := regexp.Compile(appEditor)
if err != nil {
log.Printf("failed to parse app editor as regexp: '%s'.", appEditor)
continue
}
if re.MatchString(user) || re.MatchString(username) {
editable = true
break
}
}
// Check per-app user authorization if present.
if len(app.AuthorizedUsers) > 0 {
found := false
for _, u := range app.AuthorizedUsers {
re, err := regexp.Compile(u)
if err != nil {
log.Printf("failed to parse authorizedUser as regexp: '%s', skipping app.", u)
break
}
if re.MatchString(user) || re.MatchString(username) {
found = true
break
}
}
if !found {
writeResponse(w, http.StatusUnauthorized, fmt.Sprintf("user is not authorized"))
return
}
}
// Compute pod ID from user and app, must conform to DNS-1035.
id := broker.MakePodID(user)
namespace := fmt.Sprintf("user-%s", id)
fullName := fmt.Sprintf("%s-%s", appName, id)
destDir := path.Join(broker.BuildSourceBaseDir, user, appName)
cookieValue := broker.MakeCookieValue(user, cookieSecret)
userConfigFile := path.Join(broker.AppUserConfigBaseDir, appName, user, broker.AppUserConfigJSONFile)
ts := fmt.Sprintf("%d", time.Now().Unix())
// Fetch user config, only use user options if spec.disableOptions is false.
userConfig, err := broker.GetAppUserConfig(userConfigFile)
if app.DisableOptions || err != nil {
// config does not exist yet, generate default.
defaultAppParams := make(map[string]string, 0)
for _, param := range app.UserParams {
defaultAppParams[param.Name] = param.Default
}
// Create new user config field with default spec
userConfig = broker.NewAppUserConfig(fullName, namespace, broker.AppUserConfigSpec{
AppName: appName,
User: user,
ImageRepo: app.DefaultRepo,
ImageTag: app.DefaultTag,
Tags: []string{app.DefaultTag},
NodeTier: app.DefaultTier,
Params: defaultAppParams,
})
}
userNSData := &broker.UserPodData{
Namespace: namespace,
ProjectID: projectID,
AppSpec: app,
User: user,
Timestamp: ts,
}
srcDirUser := path.Join(broker.UserBundleSourceBaseDir, appName)
destDirUser := path.Join(broker.BuildSourceBaseDirNS, user)
// Handle requests for per-app metadata requests
if regexp.MustCompile(fmt.Sprintf(".*%s/metadata/?$", appName)).MatchString(r.URL.Path) {
// Fetch pod status
status, err := broker.GetPodStatus(namespace, fmt.Sprintf("app.kubernetes.io/instance=%s,app=%s", fullName, app.ServiceName))
if err != nil {
log.Printf("failed to get pod status: %v", err)
writeResponse(w, http.StatusInternalServerError, "internal server error")
return
}
ip := ""
sessionKey := ""
if len(status.PodIPs) > 0 {
ip = status.PodIPs[0]
}
if len(status.SessionKeys) > 0 {
sessionKey = status.SessionKeys[0]
}
metadata := broker.ReservationMetadataSpec{
IP: ip,
SessionKey: sessionKey,
User: user,
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(metadata)
return
}
// Handle requests for per-app user configs
if regexp.MustCompile(fmt.Sprintf(".*%s/config/?$", appName)).MatchString(r.URL.Path) {
if getStatus {
statusCode := http.StatusOK
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(statusCode)
json.NewEncoder(w).Encode(userConfig.Spec)
return
} else if create {
// Read JSON body
if r.Header.Get("content-type") != "application/json" {
writeResponse(w, http.StatusBadRequest, "invalid content-type")
return
}
// Verify config is allowed to be set per broker app config.
if app.DisableOptions {
writeResponse(w, http.StatusBadRequest, "user config cannot be modified at this time")
return
}
var inputConfigSpec broker.AppUserConfigSpec
err := json.NewDecoder(r.Body).Decode(&inputConfigSpec)
if err != nil {
writeResponse(w, http.StatusBadRequest, "invalid app user config")
return
}
// Overwrite immutable fields.
inputConfigSpec.AppName = appName
inputConfigSpec.User = user
inputConfigSpec.Tags = userConfig.Spec.Tags
if len(inputConfigSpec.ImageRepo) == 0 {
writeResponse(w, http.StatusBadRequest, "missing config field: imageRepo")
return
}
if len(inputConfigSpec.ImageTag) == 0 {
writeResponse(w, http.StatusBadRequest, "missing config field: imageTag")
return
}
if len(inputConfigSpec.NodeTier) == 0 {
writeResponse(w, http.StatusBadRequest, "missing config field: nodeTier")
return
}
foundTier := false
for _, tier := range app.NodeTiers {
if inputConfigSpec.NodeTier == tier.Name {
foundTier = true
}
}
if !foundTier {
writeResponse(w, http.StatusBadRequest, fmt.Sprintf("invalid node tier: %s", inputConfigSpec.NodeTier))
return
}
// Verifiy image repo and tag exists if it was changed.
if inputConfigSpec.ImageRepo != userConfig.Spec.ImageRepo || inputConfigSpec.ImageTag != userConfig.Spec.ImageTag {
log.Printf("validating user image repo: %s:%s", inputConfigSpec.ImageRepo, inputConfigSpec.ImageTag)
if err := broker.ValidateImageRepo(inputConfigSpec.ImageRepo, inputConfigSpec.ImageTag, allowedRepoPattern); err != nil {
log.Printf("user %s config image validation failed: %v", user, err)
writeResponse(w, http.StatusBadRequest, fmt.Sprintf("%v", err))
return
}
}
// Verify parameters are valid for this app.
for paramName := range inputConfigSpec.Params {
found := false
for _, supportedParamName := range app.UserParams {
if paramName == supportedParamName.Name {
found = true
break
}
}
if !found {
msg := fmt.Sprintf("user %s config image validation failed: invalid parameter '%s", user, paramName)
log.Printf(msg)
writeResponse(w, http.StatusBadRequest, msg)
return
}
}
// Set user config spec to validated input spec.
userConfig.Spec = inputConfigSpec
// Save config to local file.
if err := userConfig.WriteJSON(userConfigFile); err != nil {
log.Printf("failed to save copy of user config: %v", err)
writeResponse(w, http.StatusInternalServerError, "internal server error")
return
}
// Build user namespace template.
if err := broker.BuildDeploy(broker.BrokerCommonBuildSourceBaseDirStatefulSetUser, srcDirUser, destDirUser, userNSData); err != nil {
log.Printf("%v", err)
writeResponse(w, http.StatusInternalServerError, "internal server error")
return
}
// Apply config to cluster
cmd := exec.Command("sh", "-o", "pipefail", "-c", fmt.Sprintf("kustomize build %s | kubectl apply -f - && kubectl apply -f %s", destDirUser, userConfigFile))
cmd.Dir = path.Dir(destDir)
stdoutStderr, err := cmd.CombinedOutput()
if err != nil {
log.Printf("error calling kubectl to apply user config for %s: %v\n%s", user, err, stdoutStderr)
writeResponse(w, http.StatusInternalServerError, "internal server error")
return
}
writeResponse(w, http.StatusOK, "user config updated")
} else {
writeResponse(w, http.StatusBadRequest, "only POST method is supported.")
return
}
return
}
// Extract query parameters
// Note that only the first instance of a repeated query param is used.
queryParams := make(map[string]string, len(r.URL.Query()))
for k, v := range r.URL.Query() {
queryParams[k] = v[0]
}
// Map named tier to NodeTierSpec for pod template.
// Spec contains node affinity labels and resources used in pod templates.
var nodeTierSpec broker.NodeTierSpec
found := false
for _, tier := range app.NodeTiers {
if tier.Name == userConfig.Spec.NodeTier {
nodeTierSpec = tier
found = true
break
}
}
if !found {
log.Printf("failed to map config tier '%s' to NodeTierSpec", userConfig.Spec.NodeTier)
writeResponse(w, http.StatusInternalServerError, "internal server error")
return
}
// Build map of app params from app config spec
appParams := make(map[string]string, 0)
for _, param := range app.AppParams {
appParams[param.Name] = param.Default
}
// Generate session key. Add to AppParams
if _, ok := appParams["sessionKey"]; !ok {
appParams["sessionKey"] = broker.MakeSessionKey()
}
data := &broker.UserPodData{
Namespace: namespace,
ProjectID: projectID,
ClientID: clientID,
AppSpec: app,
AppUserConfig: userConfig.Spec,
App: appName,
ImageRepo: userConfig.Spec.ImageRepo,
ImageTag: userConfig.Spec.ImageTag,
NodeTier: nodeTierSpec,
Domain: domain,
User: user,
Username: username,
CookieValue: cookieValue,
ID: id,
FullName: fullName,
ServiceName: app.ServiceName,
Resources: []string{},
Patches: []string{},
JSONPatchesService: []string{},
JSONPatchesVirtualService: []string{},
JSONPatchesDeploy: []string{},
UserParams: userConfig.Spec.Params,
AppParams: appParams,
SysParams: sysParams,
NetworkPolicyData: registeredApps.NetworkPolicyData,
Timestamp: ts,
Region: brokerRegion,
Editable: editable,
}
appPath := fmt.Sprintf("/%s/", appName)
// Build user application bundle.
srcDirApp := path.Join(broker.BundleSourceBaseDir, app.Name)
if err := broker.BuildDeploy(broker.BrokerCommonBuildSourceBaseDirStatefulSetApp, srcDirApp, destDir, data); err != nil {
log.Printf("%v", err)
writeResponse(w, http.StatusInternalServerError, "internal server error")
return
}
// Build patch that adds list of applied objects to the statefulset.
if err := broker.GenerateObjectTypePatch(destDir); err != nil {
log.Printf("%v", err)
writeResponse(w, http.StatusInternalServerError, "internal server error")
return
}
// Build user namespace template.
if err := broker.BuildDeploy(broker.BrokerCommonBuildSourceBaseDirStatefulSetUser, srcDirUser, destDirUser, userNSData); err != nil {
log.Printf("%v", err)
writeResponse(w, http.StatusInternalServerError, "internal server error")
return
}
if shutdown {
if _, err := os.Stat(destDir); os.IsNotExist(err) {
writeResponse(w, http.StatusBadRequest, "shutdown")
return
}
for i, hook := range app.ShutdownHooks {
selector := strings.Join([]string{"app.kubernetes.io/instance=" + fullName, hook.Selector}, ",")
log.Printf("executing shutdown hook %d/%d for %s, selector=%s, container=%s, command=%s", i+1, len(app.ShutdownHooks), fullName, selector, hook.Container, hook.Command)
if err := broker.ExecPodCommand(namespace, selector, hook.Container, hook.Command); err != nil {
log.Printf("error calling shutdown hook: %v", err)
}
}
// Fetch pod status to retrieve the list of object types.
status, err := broker.GetPodStatus(namespace, fmt.Sprintf("app.kubernetes.io/instance=%s,app=%s", fullName, app.ServiceName))
if err != nil {
log.Printf("failed to get pod status: %v", err)
writeResponse(w, http.StatusInternalServerError, "internal server error")
return
}
if len(status.BrokerObjects) > 0 {
log.Printf("shutting down %s pod for user: %s", appName, user)
objectTypes := strings.Join(status.BrokerObjects, ",")
cmd := exec.Command("sh", "-o", "pipefail", "-c", fmt.Sprintf("kubectl delete %s -n %s -l \"app.kubernetes.io/instance=%s\" --wait=false", objectTypes, namespace, fullName))
cmd.Dir = destDir
stdoutStderr, err := cmd.CombinedOutput()
if err != nil {
log.Printf("error calling kubectl for %s: %v\n%s", user, err, stdoutStderr)
writeResponse(w, http.StatusInternalServerError, "internal server error")
return
}
}
// Delete the cookie by setting max-age to -1
broker.SetCookie(w, cookieName, cookieValue, appPath, -1)
writeResponse(w, http.StatusAccepted, "terminating")
return
}
if getStatus {
// Get pod status based on conditions.
status, err := broker.GetPodStatus(namespace, fmt.Sprintf("app.kubernetes.io/instance=%s,app=%s", fullName, app.ServiceName))
if err != nil {
log.Printf("failed to get pod ips: %v", err)
writeResponse(w, http.StatusInternalServerError, "internal server error")
return
}
statusCode := http.StatusOK
if status.Status == "waiting" {
statusCode = http.StatusCreated
}
if status.Status == "ready" {
broker.SetCookie(w, cookieName, cookieValue, appPath, maxCookieAgeSeconds)
}
if redirectURL, ok := queryParams["r"]; ok {
// Add header to redirect.
w.Header().Set("Location", redirectURL)
statusCode = http.StatusTemporaryRedirect
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(statusCode)
json.NewEncoder(w).Encode(status)
return
}
if create {
log.Printf("creating pod for user: %s: %s", user, fullName)
cmd := exec.Command("sh", "-o", "pipefail", "-c", fmt.Sprintf("kustomize build %s | kubectl apply -f - && kustomize build %s | kubectl apply -f -", destDirUser, destDir))
cmd.Dir = destDir
stdoutStderr, err := cmd.CombinedOutput()
if err != nil {
log.Printf("error calling kubectl for %s: %v\n%s", user, err, stdoutStderr)
writeResponse(w, http.StatusInternalServerError, "internal server error")
return
}
broker.SetCookie(w, cookieName, cookieValue, appPath, maxCookieAgeSeconds)
writeResponse(w, http.StatusAccepted, "created")
log.Printf("pod created for user: %s: %s", user, fullName)
}
})
log.Println("Listening on port 8080")
log.Fatal(http.ListenAndServe(":8080", nil))
}