func watchPods()

in images/controller/cmd/reservation_broker/reservation_broker.go [433:512]


func watchPods(app broker.AppConfigSpec, appCtx *AppContext) {
	appCtx.PodWatcherRunning = true

	// Get current pod reservations, those not managed by Deployment, reserved for users.
	selector := fmt.Sprintf("%s, app.kubernetes.io/managed-by notin (pod-broker)", app.Deployment.Selector)
	podResp, err := listBrokerPods(app.Name, selector)
	if err != nil {
		log.Printf("failed to list initial reserved pods: %v", err)
	} else {
		appCtx.Lock()
		for _, pod := range podResp.Items {
			if pod.Metadata.DeletionTimestamp != nil {
				// Skip terminating pods
				continue
			}
			podName := pod.Metadata.Name
			podIP := pod.Status.PodIPs[0].IP
			if podUser, ok := pod.Metadata.Annotations["app.broker/user"]; ok {
				sessionKey, ok := pod.Metadata.Annotations["app.broker/session-key"]
				userObjects, ok := pod.Metadata.Annotations["app.broker/last-applied-object-types"]
				if !ok {
					log.Printf("Warning: missing app.broker/session-key on existing reservation: %s", podName)
				}
				log.Printf("Found existing reservation: %s: %s", podName, podUser)
				appCtx.ReservedPods[podUser] = BrokerPod{
					Name:        podName,
					IP:          podIP,
					SessionKey:  sessionKey,
					UserObjects: strings.Split(userObjects, ","),
				}
			}
		}
		appCtx.Unlock()
	}

	go func() {
		log.Printf("started pod watcher for %s", app.Name)
		for {
			if !appCtx.PodWatcherRunning {
				break
			}

			appCtx.Lock()

			// Find available pods, those currently managed by Deployment.
			selector := fmt.Sprintf("%s,app.kubernetes.io/managed-by=pod-broker", app.Deployment.Selector)
			podResp, err := listBrokerPods(app.Name, selector)
			if err != nil {
				log.Printf("failed to list pods for app: %s: %v", app.Name, err)
				time.Sleep(2 * time.Second)
				appCtx.Unlock()
				continue
			}

			// Sort by creation time, descending order.
			sort.Slice(podResp.Items, func(i, j int) bool {
				return podResp.Items[i].Metadata.CreationTimestamp < podResp.Items[j].Metadata.CreationTimestamp
			})

			appCtx.AvailablePods = make([]BrokerPod, 0)
			for _, pod := range podResp.Items {
				if len(pod.Status.PodIPs) == 0 {
					continue
				}
				if pod.Metadata.DeletionTimestamp != nil {
					// Skip terminating pods
					continue
				}
				appCtx.AvailablePods = append(appCtx.AvailablePods, BrokerPod{
					Name: pod.Metadata.Name,
					IP:   pod.Status.PodIPs[0].IP,
				})
			}
			appCtx.Unlock()

			time.Sleep(2 * time.Second)
		}
		log.Printf("stopped pod watcher for %s", app.Name)
	}()
}