func()

in cmd/gcp-controller-manager/node_syncer.go [189:249]


func (ns *nodeSyncer) process(key string) error {
	o, exists, err := ns.indexer.GetByKey(key)
	if err != nil {
		return fmt.Errorf("failed to get Pod %q: %v", key, err)
	}
	if !exists { // pod removal event
		podUIDs := ns.nodes.find(key)
		if len(podUIDs) == 0 {
			klog.Warningf("Pod key %q not found", key)
			return nil // no retry
		}
		if ns.delayGSARemove {
			for _, uid := range podUIDs {
				klog.Infof("Pod %q (UID %q) is queued for delayed removal", key, uid)
				ns.podRemoveQueue.AddAfter(uid, nodeSyncerGSARemoveDelay)
			}
			return nil
		}
		nodeSet := make(map[string]bool)
		for _, uid := range podUIDs {
			node, gsa, found := ns.nodes.remove(uid)
			if !found {
				klog.Warning("Pod %q (UID %q) not found on removal event: %v", key, uid, err)
				continue
			}
			klog.Infof("Pod %q running as %q is removed from Node %q (pod UID: %q)", key, gsa, node, uid)
			nodeSet[node] = true
		}
		for node := range nodeSet {
			if err := ns.sync(node); err != nil {
				// Log only; retries will be triggered by informer's resync events.
				klog.Warningf("Failed to sync Node %q for GSA removal: %v", node, err)
			}
		}
		return nil
	}
	pod, ok := o.(*core.Pod)
	if !ok {
		return fmt.Errorf("invalid pod object from key %q: %#v", key, o)
	}

	ksa := serviceAccount{
		Namespace: pod.ObjectMeta.Namespace,
		Name:      pod.Spec.ServiceAccountName,
	}
	gsa, found := ns.verifiedSAs.get(ksa)
	if !found {
		klog.V(5).Infof("ServiceAccount %q is not authorized to act as any GSA.", ksa)
		return nil
	}
	node := pod.Spec.NodeName
	klog.Infof("Adding GSA %q to Node %q where Pod %q is running as KSA %q.", gsa, node, key, ksa)
	gsaLast, found := ns.nodes.add(node, pod.ObjectMeta.UID, key, gsa)
	if found && gsaLast == gsa {
		return nil
	}
	if found && gsaLast != gsa {
		klog.Infof("The authorized GSA of KSA %q that Pod %q runs as has been changed from %q to %q.", ksa, key, gsaLast, gsa)
	}
	return ns.sync(node)
}