in node/helpers/labels.go [60:103]
func addOrUpdateLabelsOnNode(kubeClient clientset.Interface, nodeName string, labelsToUpdate map[string]string) error {
firstTry := true
return clientretry.RetryOnConflict(updateLabelBackoff, func() error {
var err error
var node *v1.Node
// First we try getting node from the API server cache, as it's cheaper. If it fails
// we get it from etcd to be sure to have fresh data.
if firstTry {
node, err = kubeClient.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{ResourceVersion: "0"})
firstTry = false
} else {
node, err = kubeClient.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
}
if err != nil {
return err
}
// Make a copy of the node and update the labels.
newNode := node.DeepCopy()
if newNode.Labels == nil {
newNode.Labels = make(map[string]string)
}
for key, value := range labelsToUpdate {
newNode.Labels[key] = value
}
oldData, err := json.Marshal(node)
if err != nil {
return fmt.Errorf("failed to marshal the existing node %#v: %v", node, err)
}
newData, err := json.Marshal(newNode)
if err != nil {
return fmt.Errorf("failed to marshal the new node %#v: %v", newNode, err)
}
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, &v1.Node{})
if err != nil {
return fmt.Errorf("failed to create a two-way merge patch: %v", err)
}
if _, err := kubeClient.CoreV1().Nodes().Patch(context.TODO(), node.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}); err != nil {
return fmt.Errorf("failed to patch the node: %v", err)
}
return nil
})
}