images/controller/cmd/app_publisher/app_publisher.go (231 lines of code) (raw):
/*
Copyright 2019 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"encoding/json"
"fmt"
"log"
"net/http"
"net/http/httputil"
"os"
"os/exec"
"path"
"strings"
"text/template"
"github.com/Masterminds/sprig"
broker "selkies.io/controller/pkg"
)
type newAppData struct {
Name string `json:"name"`
DisplayName string `json:"displayName"`
Description string `json:"description"`
Icon string `json:"icon"`
Entrypoint string `json:"entrypoint"`
}
type appPublishJobTemplateData struct {
JobName string
Namespace string
AppName string
User string
NodeName string
ContainerID string
ImageTag string
ProjectID string
NewApp newAppData
}
func main() {
log.Printf("Starting broker app publisher service")
// Set from downward API.
namespace := os.Getenv("NAMESPACE")
if len(namespace) == 0 {
log.Fatal("Missing NAMESPACE env.")
}
templatePath := os.Getenv("TEMPLATE_PATH")
if len(templatePath) == 0 {
templatePath = "/run/app-publisher/template/app-publish-job.yaml.tmpl"
}
// Values available to templates from environment variables prefixed with POD_BROKER_PARAM_Name=Value
// Map of Name=Value
sysParams := broker.GetEnvPrefixedVars("POD_BROKER_PARAM_")
// AuthHeader from params
authHeaderName, ok := sysParams["AuthHeader"]
if !ok {
log.Fatal("Missing POD_BROKER_PARAM_AuthHeader env.")
}
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
if _, ok := sysParams["Debug"]; ok {
data, _ := httputil.DumpRequest(r, false)
log.Println(string(data))
}
// Discover apps from their config specs located on the filesystem.
// TODO: look into caching this, large number of apps and http requests can slow down the broker.
registeredApps, err := broker.NewRegisteredAppManifestFromJSON(broker.RegisteredAppsManifestJSONFile, broker.AppTypeStatefulSet)
if err != nil {
log.Printf("failed to parse registered app manifest: %v", err)
writeResponse(w, http.StatusInternalServerError, "internal server error")
return
}
// Extract app name from path
reqApp := strings.Split(r.URL.Path, "/")[1]
// Get app spec from parsed apps.
app, ok := registeredApps.Apps[reqApp]
if !ok {
log.Printf("app not found: %s", reqApp)
writeResponse(w, http.StatusNotFound, fmt.Sprintf("app not found: %s", reqApp))
return
}
appName := app.Name
cookieName := fmt.Sprintf("broker_%s", appName)
// Get user from cookie or header
user := broker.GetUserFromCookieOrAuthHeader(r, cookieName, authHeaderName)
if len(user) == 0 {
writeResponse(w, http.StatusBadRequest, fmt.Sprintf("Failed to get user from cookie or auth header"))
return
}
// IAP uses a prefix of accounts.google.com:email, remove this to just get the email
userToks := strings.Split(user, ":")
user = userToks[len(userToks)-1]
// App is editable if user is in the list of editors.
editable := false
for _, appEditor := range app.Editors {
if appEditor == user {
editable = true
break
}
}
// Return with error if user is unauthorized
if !editable {
writeResponse(w, http.StatusUnauthorized, "user is not authorized to publish")
return
}
getRequest := false
postRequest := false
deleteRequest := false
switch r.Method {
case "POST":
postRequest = true
case "DELETE":
deleteRequest = true
case "GET":
getRequest = true
}
jobName := fmt.Sprintf("app-publish-%s", appName)
currJobs, err := broker.GetJobs(namespace, fmt.Sprintf("app=%s", jobName))
if err != nil {
writeResponse(w, http.StatusInternalServerError, "failed to query image publish jobs")
return
}
// GET request checks status of current publish jobs for this app.
// Responses:
// StatusOK (200): Idle, no job currently running.
// StatusCreated (201): Job is currently running.
if getRequest {
if len(currJobs) == 0 {
writeResponse(w, http.StatusOK, "no active jobs.")
return
}
writeResponse(w, http.StatusCreated, "image publish job is running")
return
}
// POST request creates new job to publish app from existing container.
if postRequest {
newApp, err := parseNewAppData(r, app)
if err != nil {
writeResponse(w, http.StatusBadRequest, fmt.Sprintf("failed to parse new app data: %v", err))
return
}
// Error if job is already running.
if len(currJobs) > 0 {
writeResponse(w, http.StatusTooManyRequests, "app publish job is already running")
return
}
id := broker.MakePodID(user)
userNamespace := fmt.Sprintf("user-%s", id)
fullName := fmt.Sprintf("%s-%s", appName, id)
// Get current status of app pod.
podStatus, err := broker.GetPodStatus(userNamespace, fmt.Sprintf("app.kubernetes.io/instance=%s,app=%s", fullName, app.ServiceName))
if err != nil {
log.Printf("failed to get pod ips: %v", err)
writeResponse(w, http.StatusInternalServerError, "internal server error")
return
}
if podStatus.Status != "ready" {
writeResponse(w, http.StatusBadRequest, "pod is not ready")
return
}
if len(podStatus.Nodes) == 0 {
writeResponse(w, http.StatusNotFound, fmt.Sprintf("no pods found matching: %s", fullName))
return
}
if len(podStatus.Nodes) != 1 {
log.Printf("Found more than one node for pod instance %s", fullName)
writeResponse(w, http.StatusBadRequest, "failed to locate pod on single node to publish from")
return
}
nodeName := podStatus.Nodes[0]
containerID := strings.ReplaceAll(podStatus.Containers["desktop"], "docker://", "")
image := fmt.Sprintf("gcr.io/%s/vdi-%s:latest", sysParams["ProjectID"], newApp.Name)
if err := makeAppPublishJob(namespace, jobName, appName, image, nodeName, containerID, user, sysParams["ProjectID"], templatePath, newApp); err != nil {
log.Printf("failed to create job: %v", err)
writeResponse(w, http.StatusInternalServerError, "failed to create job")
return
}
writeResponse(w, http.StatusCreated, fmt.Sprintf("Created app publish job: %s", jobName))
return
}
// DELETE request will delete the job.
if deleteRequest {
writeResponse(w, http.StatusBadRequest, "NTI")
return
}
})
log.Println("Listening on port 8081")
log.Fatal(http.ListenAndServe(":8081", nil))
}
func writeResponse(w http.ResponseWriter, statusCode int, message string) {
status := broker.StatusResponse{
Code: statusCode,
Status: message,
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(statusCode)
json.NewEncoder(w).Encode(status)
}
func parseNewAppData(r *http.Request, oldApp broker.AppConfigSpec) (newAppData, error) {
resp := newAppData{}
// Read JSON body
if r.Header.Get("content-type") != "application/json" {
return resp, fmt.Errorf("invalid content-type, expected: application/json")
}
err := json.NewDecoder(r.Body).Decode(&resp)
if err != nil {
return resp, err
}
if len(resp.Name) == 0 {
return resp, fmt.Errorf("missing 'name'")
}
if len(resp.DisplayName) == 0 {
// Default to match name
resp.DisplayName = resp.Name
}
if len(resp.Description) == 0 {
// Default to old description.
resp.Description = oldApp.Description
}
if len(resp.Icon) == 0 {
// Default to old icon
resp.Icon = oldApp.Icon
}
return resp, nil
}
func makeAppPublishJob(namespace, jobName, appName, image, nodeName, containerID, user, projectID, templatePath string, newApp newAppData) error {
log.Printf("creating app publish job: %s, %s, %s", jobName, image, nodeName)
data := appPublishJobTemplateData{
JobName: jobName,
Namespace: namespace,
AppName: appName,
User: user,
NodeName: nodeName,
ContainerID: containerID,
ImageTag: image,
ProjectID: projectID,
NewApp: newApp,
}
destDir := path.Join("/run/app-publisher", jobName)
if err := os.MkdirAll(destDir, os.ModePerm); err != nil {
return fmt.Errorf("failed to make destDir %s: %v", destDir, err)
}
base := path.Base(templatePath)
t, err := template.New(base).Funcs(sprig.TxtFuncMap()).ParseFiles(templatePath)
if err != nil {
return fmt.Errorf("failed to initialize template: %v", err)
}
dest, _ := os.Create(strings.ReplaceAll(path.Join(destDir, base), ".tmpl", ""))
if err != nil {
return fmt.Errorf("failed to create dest template file: %v", err)
}
if err = t.Execute(dest, &data); err != nil {
return fmt.Errorf("failed to execute template: %v", err)
}
// Apply the job to the cluster
cmd := exec.Command("sh", "-c", fmt.Sprintf("kubectl apply -f %s 1>&2", destDir))
cmd.Dir = path.Dir(destDir)
stdoutStderr, err := cmd.CombinedOutput()
if err != nil {
return fmt.Errorf("error calling kubectl to apply job: %v\n%s", err, string(stdoutStderr))
}
return nil
}