in images/controller/cmd/reservation_broker/reservation_broker.go [86:354]
func main() {
cookieSecret := os.Getenv("COOKIE_SECRET")
if len(cookieSecret) == 0 {
// Generate random secret
log.Printf("no COOKIE_SECRET env var found, generating random secret value")
h := sha1.New()
io.WriteString(h, fmt.Sprintf("%d.%d", rand.Intn(10000), int32(time.Now().Unix())))
cookieSecret = fmt.Sprintf("%x", h.Sum(nil))
}
clientID := os.Getenv("OAUTH_CLIENT_ID")
if len(clientID) == 0 {
log.Fatalf("missing env, OAUTH_CLIENT_ID")
}
// Project ID from instance metadata
projectID, err := broker.GetProjectID()
if err != nil {
log.Fatalf("failed to determine project ID: %v", err)
}
// Region from instance metadata
brokerRegion, err := broker.GetInstanceRegion()
if err != nil {
log.Fatalf("failed to determine broker region: %v", err)
}
// Values available to templates from environment variables prefixed with POD_BROKER_PARAM_Name=Value
// Map of Name=Value
sysParams := broker.GetEnvPrefixedVars("POD_BROKER_PARAM_")
// Domain from params
domain, ok := sysParams["Domain"]
if !ok {
log.Fatal("Missing POD_BROKER_PARAM_Domain env.")
}
// AuthHeader from params
authHeaderName, ok := sysParams["AuthHeader"]
if !ok {
log.Fatal("Missing POD_BROKER_PARAM_AuthHeader env.")
}
usernameHeader, _ := sysParams["UsernameHeader"]
// Period which to scan for apps
scanPeriod := 5 * time.Second
// Period which to re-apply manifests
resyncPeriod := 60 * time.Second
// Map of cached app manifest checksums
manifestChecksums := make(map[string]string, 0)
// Muxed server to handle per-app routes.
server := &Server{
Port: "8082",
Dispatcher: mux.NewRouter(),
Urls: make(map[string]func(w http.ResponseWriter, r *http.Request)),
}
// Map of app contexts.
appContexts := make(map[string]*AppContext, 0)
// Sync loop for app resources
go func() {
lastSync := time.Now()
for {
// Discover apps from their config specs located on the filesystem.
// TODO: look into caching this, large number of apps and http requests can slow down the broker.
registeredApps, err := broker.NewRegisteredAppManifestFromJSON(broker.RegisteredAppsManifestJSONFile, broker.AppTypeDeployment)
if err != nil {
log.Printf("failed to parse registered app manifest: %v", err)
time.Sleep(2 * time.Second)
continue
}
for _, app := range registeredApps.Apps {
if len(app.Deployment.Selector) == 0 {
log.Printf("error app is missing deployment.selector: %s, skipping", app.Name)
continue
}
// Common variables
id := broker.MakePodID(app.Name)
fullName := fmt.Sprintf("%s-%s", app.Name, id)
namespace := app.Name
ts := fmt.Sprintf("%d", time.Now().Unix())
// Verify that the DefaultTier is in the list of NodeTiers and use it.
var nodeTierSpec broker.NodeTierSpec
found := false
for _, tier := range app.NodeTiers {
if tier.Name == app.DefaultTier {
nodeTierSpec = tier
found = true
break
}
}
if !found {
log.Printf("Default tier '%s' not found in list of app node tiers", app.DefaultTier)
continue
}
// Build map of app params from app config spec
appParams := make(map[string]string, 0)
for _, param := range app.AppParams {
appParams[param.Name] = param.Default
}
// Path to write compiled template output to.
destDir := path.Join(broker.BuildSourceBaseDir, app.Name)
// Create template data
data := &broker.UserPodData{
Namespace: namespace,
ProjectID: projectID,
ClientID: clientID,
AppSpec: app,
App: app.Name,
ImageRepo: app.DefaultRepo,
ImageTag: app.DefaultTag,
NodeTier: nodeTierSpec,
Domain: domain,
User: app.Name,
Username: app.Name,
ID: id,
FullName: fullName,
ServiceName: app.ServiceName,
Resources: []string{},
Patches: []string{},
JSONPatchesService: []string{},
JSONPatchesVirtualService: []string{},
JSONPatchesDeploy: []string{},
AppParams: appParams,
SysParams: sysParams,
NetworkPolicyData: registeredApps.NetworkPolicyData,
Timestamp: ts,
Region: brokerRegion,
}
// Build the application bundle.
srcDirApp := path.Join(broker.BundleSourceBaseDir, app.Name)
if err := broker.BuildDeploy(broker.BrokerCommonBuildSourceBaseDirDeploymentApp, srcDirApp, destDir, data); err != nil {
log.Printf("%v", err)
continue
}
var appCtx *AppContext
if c, ok := appContexts[app.Name]; ok {
appCtx = c
} else {
appCtx = &AppContext{
AuthHeaderName: authHeaderName,
UsernameHeader: usernameHeader,
CookieSecret: cookieSecret,
PodData: *data,
AvailablePods: make([]BrokerPod, 0),
ReservedPods: make(map[string]BrokerPod),
PodWatcherRunning: false,
}
appContexts[app.Name] = appCtx
}
// Register the app handler
registerAppHandler(server, app, appCtx)
// Start the pod watcher
if !appCtx.PodWatcherRunning {
watchPods(app, appCtx)
}
// Compute and cache checksum to know if we need to re-apply the manifests.
prevChecksum := manifestChecksums[app.Name]
if manifestChecksums[app.Name], err = broker.ChecksumDeploy(destDir); err != nil {
log.Printf("failed to checksum build output directory: %v", err)
continue
}
if prevChecksum != manifestChecksums[app.Name] {
log.Printf("%s manifest checksum: %s", app.Name, manifestChecksums[app.Name])
} else {
now := time.Now()
if now.Sub(lastSync) >= resyncPeriod {
lastSync = now
} else {
continue
}
}
// Apply manifests them to the cluster.
log.Printf("deploying manifests for app: %s", destDir)
cmd := exec.Command("sh", "-o", "pipefail", "-c", fmt.Sprintf("kustomize build %s | kubectl apply -f -", destDir))
cmd.Dir = destDir
stdoutStderr, err := cmd.CombinedOutput()
if err != nil {
log.Printf("error calling kubectl for %s: %v\n%s", app.Name, err, stdoutStderr)
continue
}
}
// Prune deleted BrokerAppConfigs, delete namespace and files.
foundDirs, err := filepath.Glob(path.Join(broker.BuildSourceBaseDir, "*"))
if err != nil {
log.Printf("failed to list app directories to prune: %v", err)
}
for _, dirName := range foundDirs {
found := false
for _, app := range registeredApps.Apps {
if app.Name == path.Base(dirName) {
found = true
break
}
}
if !found {
appName := path.Base(dirName)
log.Printf("removing app: %s", appName)
// Stop the pod watcher
appContexts[appName].PodWatcherRunning = false
// Remove app context
delete(appContexts, appName)
// Remove app from checksum cache
delete(manifestChecksums, appName)
// Delete the app namespace
cmd := exec.Command("sh", "-o", "pipefail", "-c", fmt.Sprintf("kubectl delete --wait=false ns %s", appName))
stdoutStderr, err := cmd.CombinedOutput()
if err != nil {
log.Printf("error calling kubectl to delete namespace %s: %v\n%s", appName, err, stdoutStderr)
}
// Delete the app directory
os.RemoveAll(dirName)
}
}
time.Sleep(scanPeriod)
}
}()
server.InitDispatch()
log.Printf("Initializing request routes...\n")
server.Urls["metadata"] = func(w http.ResponseWriter, r *http.Request) {
srcIP := strings.Split(r.RemoteAddr, ":")[0]
// Check reserved pods to match requestor IP.
for _, appCtx := range appContexts {
for user, pod := range appCtx.ReservedPods {
if pod.IP == srcIP {
metadata := broker.ReservationMetadataSpec{
IP: pod.IP,
SessionKey: pod.SessionKey,
User: user,
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(metadata)
return
}
}
}
writeResponse(w, http.StatusNotFound, fmt.Sprintf("reservation metadata not found for IP: %s", srcIP))
}
server.Start()
}