in images/controller/cmd/app_publisher/app_publisher.go [55:237]
func main() {
log.Printf("Starting broker app publisher service")
// Set from downward API.
namespace := os.Getenv("NAMESPACE")
if len(namespace) == 0 {
log.Fatal("Missing NAMESPACE env.")
}
templatePath := os.Getenv("TEMPLATE_PATH")
if len(templatePath) == 0 {
templatePath = "/run/app-publisher/template/app-publish-job.yaml.tmpl"
}
// Values available to templates from environment variables prefixed with POD_BROKER_PARAM_Name=Value
// Map of Name=Value
sysParams := broker.GetEnvPrefixedVars("POD_BROKER_PARAM_")
// AuthHeader from params
authHeaderName, ok := sysParams["AuthHeader"]
if !ok {
log.Fatal("Missing POD_BROKER_PARAM_AuthHeader env.")
}
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
if _, ok := sysParams["Debug"]; ok {
data, _ := httputil.DumpRequest(r, false)
log.Println(string(data))
}
// Discover apps from their config specs located on the filesystem.
// TODO: look into caching this, large number of apps and http requests can slow down the broker.
registeredApps, err := broker.NewRegisteredAppManifestFromJSON(broker.RegisteredAppsManifestJSONFile, broker.AppTypeStatefulSet)
if err != nil {
log.Printf("failed to parse registered app manifest: %v", err)
writeResponse(w, http.StatusInternalServerError, "internal server error")
return
}
// Extract app name from path
reqApp := strings.Split(r.URL.Path, "/")[1]
// Get app spec from parsed apps.
app, ok := registeredApps.Apps[reqApp]
if !ok {
log.Printf("app not found: %s", reqApp)
writeResponse(w, http.StatusNotFound, fmt.Sprintf("app not found: %s", reqApp))
return
}
appName := app.Name
cookieName := fmt.Sprintf("broker_%s", appName)
// Get user from cookie or header
user := broker.GetUserFromCookieOrAuthHeader(r, cookieName, authHeaderName)
if len(user) == 0 {
writeResponse(w, http.StatusBadRequest, fmt.Sprintf("Failed to get user from cookie or auth header"))
return
}
// IAP uses a prefix of accounts.google.com:email, remove this to just get the email
userToks := strings.Split(user, ":")
user = userToks[len(userToks)-1]
// App is editable if user is in the list of editors.
editable := false
for _, appEditor := range app.Editors {
if appEditor == user {
editable = true
break
}
}
// Return with error if user is unauthorized
if !editable {
writeResponse(w, http.StatusUnauthorized, "user is not authorized to publish")
return
}
getRequest := false
postRequest := false
deleteRequest := false
switch r.Method {
case "POST":
postRequest = true
case "DELETE":
deleteRequest = true
case "GET":
getRequest = true
}
jobName := fmt.Sprintf("app-publish-%s", appName)
currJobs, err := broker.GetJobs(namespace, fmt.Sprintf("app=%s", jobName))
if err != nil {
writeResponse(w, http.StatusInternalServerError, "failed to query image publish jobs")
return
}
// GET request checks status of current publish jobs for this app.
// Responses:
// StatusOK (200): Idle, no job currently running.
// StatusCreated (201): Job is currently running.
if getRequest {
if len(currJobs) == 0 {
writeResponse(w, http.StatusOK, "no active jobs.")
return
}
writeResponse(w, http.StatusCreated, "image publish job is running")
return
}
// POST request creates new job to publish app from existing container.
if postRequest {
newApp, err := parseNewAppData(r, app)
if err != nil {
writeResponse(w, http.StatusBadRequest, fmt.Sprintf("failed to parse new app data: %v", err))
return
}
// Error if job is already running.
if len(currJobs) > 0 {
writeResponse(w, http.StatusTooManyRequests, "app publish job is already running")
return
}
id := broker.MakePodID(user)
userNamespace := fmt.Sprintf("user-%s", id)
fullName := fmt.Sprintf("%s-%s", appName, id)
// Get current status of app pod.
podStatus, err := broker.GetPodStatus(userNamespace, fmt.Sprintf("app.kubernetes.io/instance=%s,app=%s", fullName, app.ServiceName))
if err != nil {
log.Printf("failed to get pod ips: %v", err)
writeResponse(w, http.StatusInternalServerError, "internal server error")
return
}
if podStatus.Status != "ready" {
writeResponse(w, http.StatusBadRequest, "pod is not ready")
return
}
if len(podStatus.Nodes) == 0 {
writeResponse(w, http.StatusNotFound, fmt.Sprintf("no pods found matching: %s", fullName))
return
}
if len(podStatus.Nodes) != 1 {
log.Printf("Found more than one node for pod instance %s", fullName)
writeResponse(w, http.StatusBadRequest, "failed to locate pod on single node to publish from")
return
}
nodeName := podStatus.Nodes[0]
containerID := strings.ReplaceAll(podStatus.Containers["desktop"], "docker://", "")
image := fmt.Sprintf("gcr.io/%s/vdi-%s:latest", sysParams["ProjectID"], newApp.Name)
if err := makeAppPublishJob(namespace, jobName, appName, image, nodeName, containerID, user, sysParams["ProjectID"], templatePath, newApp); err != nil {
log.Printf("failed to create job: %v", err)
writeResponse(w, http.StatusInternalServerError, "failed to create job")
return
}
writeResponse(w, http.StatusCreated, fmt.Sprintf("Created app publish job: %s", jobName))
return
}
// DELETE request will delete the job.
if deleteRequest {
writeResponse(w, http.StatusBadRequest, "NTI")
return
}
})
log.Println("Listening on port 8081")
log.Fatal(http.ListenAndServe(":8081", nil))
}