in cmd/gcp-controller-manager/node_syncer.go [189:249]
func (ns *nodeSyncer) process(key string) error {
o, exists, err := ns.indexer.GetByKey(key)
if err != nil {
return fmt.Errorf("failed to get Pod %q: %v", key, err)
}
if !exists { // pod removal event
podUIDs := ns.nodes.find(key)
if len(podUIDs) == 0 {
klog.Warningf("Pod key %q not found", key)
return nil // no retry
}
if ns.delayGSARemove {
for _, uid := range podUIDs {
klog.Infof("Pod %q (UID %q) is queued for delayed removal", key, uid)
ns.podRemoveQueue.AddAfter(uid, nodeSyncerGSARemoveDelay)
}
return nil
}
nodeSet := make(map[string]bool)
for _, uid := range podUIDs {
node, gsa, found := ns.nodes.remove(uid)
if !found {
klog.Warning("Pod %q (UID %q) not found on removal event: %v", key, uid, err)
continue
}
klog.Infof("Pod %q running as %q is removed from Node %q (pod UID: %q)", key, gsa, node, uid)
nodeSet[node] = true
}
for node := range nodeSet {
if err := ns.sync(node); err != nil {
// Log only; retries will be triggered by informer's resync events.
klog.Warningf("Failed to sync Node %q for GSA removal: %v", node, err)
}
}
return nil
}
pod, ok := o.(*core.Pod)
if !ok {
return fmt.Errorf("invalid pod object from key %q: %#v", key, o)
}
ksa := serviceAccount{
Namespace: pod.ObjectMeta.Namespace,
Name: pod.Spec.ServiceAccountName,
}
gsa, found := ns.verifiedSAs.get(ksa)
if !found {
klog.V(5).Infof("ServiceAccount %q is not authorized to act as any GSA.", ksa)
return nil
}
node := pod.Spec.NodeName
klog.Infof("Adding GSA %q to Node %q where Pod %q is running as KSA %q.", gsa, node, key, ksa)
gsaLast, found := ns.nodes.add(node, pod.ObjectMeta.UID, key, gsa)
if found && gsaLast == gsa {
return nil
}
if found && gsaLast != gsa {
klog.Infof("The authorized GSA of KSA %q that Pod %q runs as has been changed from %q to %q.", ksa, key, gsaLast, gsa)
}
return ns.sync(node)
}