in images/controller/cmd/reservation_broker/reservation_broker.go [433:512]
func watchPods(app broker.AppConfigSpec, appCtx *AppContext) {
appCtx.PodWatcherRunning = true
// Get current pod reservations, those not managed by Deployment, reserved for users.
selector := fmt.Sprintf("%s, app.kubernetes.io/managed-by notin (pod-broker)", app.Deployment.Selector)
podResp, err := listBrokerPods(app.Name, selector)
if err != nil {
log.Printf("failed to list initial reserved pods: %v", err)
} else {
appCtx.Lock()
for _, pod := range podResp.Items {
if pod.Metadata.DeletionTimestamp != nil {
// Skip terminating pods
continue
}
podName := pod.Metadata.Name
podIP := pod.Status.PodIPs[0].IP
if podUser, ok := pod.Metadata.Annotations["app.broker/user"]; ok {
sessionKey, ok := pod.Metadata.Annotations["app.broker/session-key"]
userObjects, ok := pod.Metadata.Annotations["app.broker/last-applied-object-types"]
if !ok {
log.Printf("Warning: missing app.broker/session-key on existing reservation: %s", podName)
}
log.Printf("Found existing reservation: %s: %s", podName, podUser)
appCtx.ReservedPods[podUser] = BrokerPod{
Name: podName,
IP: podIP,
SessionKey: sessionKey,
UserObjects: strings.Split(userObjects, ","),
}
}
}
appCtx.Unlock()
}
go func() {
log.Printf("started pod watcher for %s", app.Name)
for {
if !appCtx.PodWatcherRunning {
break
}
appCtx.Lock()
// Find available pods, those currently managed by Deployment.
selector := fmt.Sprintf("%s,app.kubernetes.io/managed-by=pod-broker", app.Deployment.Selector)
podResp, err := listBrokerPods(app.Name, selector)
if err != nil {
log.Printf("failed to list pods for app: %s: %v", app.Name, err)
time.Sleep(2 * time.Second)
appCtx.Unlock()
continue
}
// Sort by creation time, descending order.
sort.Slice(podResp.Items, func(i, j int) bool {
return podResp.Items[i].Metadata.CreationTimestamp < podResp.Items[j].Metadata.CreationTimestamp
})
appCtx.AvailablePods = make([]BrokerPod, 0)
for _, pod := range podResp.Items {
if len(pod.Status.PodIPs) == 0 {
continue
}
if pod.Metadata.DeletionTimestamp != nil {
// Skip terminating pods
continue
}
appCtx.AvailablePods = append(appCtx.AvailablePods, BrokerPod{
Name: pod.Metadata.Name,
IP: pod.Status.PodIPs[0].IP,
})
}
appCtx.Unlock()
time.Sleep(2 * time.Second)
}
log.Printf("stopped pod watcher for %s", app.Name)
}()
}