in pkg/controllers/nodeclaim/capacityreservation/controller.go [107:157]
func (c *Controller) syncCapacityType(ctx context.Context, capacityType string, nc *karpv1.NodeClaim) (bool, error) {
// We won't be able to sync deleting NodeClaims, and there's no real need to either as they're already draining.
if !nc.DeletionTimestamp.IsZero() {
return false, nil
}
// For now we only account for the case where a reserved NodeClaim becomes an on-demand NodeClaim. This does not
// account for on-demand NodeClaims being promoted to reserved since that is not natively supported by Karpenter.
if capacityType != karpv1.CapacityTypeOnDemand {
return false, nil
}
updated := false
if nc.Labels[karpv1.CapacityTypeLabelKey] == karpv1.CapacityTypeReserved {
stored := nc.DeepCopy()
nc.Labels[karpv1.CapacityTypeLabelKey] = karpv1.CapacityTypeOnDemand
delete(nc.Labels, cloudprovider.ReservationIDLabel)
if err := c.kubeClient.Patch(ctx, nc, client.MergeFrom(stored)); client.IgnoreNotFound(err) != nil {
return false, serrors.Wrap(fmt.Errorf("patching nodeclaim, %w", err), "NodeClaim", klog.KObj(nc))
}
updated = true
}
// If the reservation expired before the NodeClaim became registered, there may not be a Node on the cluster. Note
// that there should never be duplicate Nodes for a given NodeClaim, but handling this user-induced error is more
// straightforward than handling the duplicate error.
nodes, err := nodeclaimutils.AllNodesForNodeClaim(ctx, c.kubeClient, nc)
if err != nil {
return false, serrors.Wrap(fmt.Errorf("listing nodes for nodeclaim, %w", err), "NodeClaim", klog.KObj(nc))
}
for _, n := range nodes {
if !n.DeletionTimestamp.IsZero() {
continue
}
// Skip Nodes which haven't been registered since we still may not have synced labels. We'll get it on the next
// iteration.
if n.Labels[karpv1.NodeRegisteredLabelKey] != "true" {
continue
}
if n.Labels[karpv1.CapacityTypeLabelKey] != karpv1.CapacityTypeReserved {
continue
}
stored := n.DeepCopy()
n.Labels[karpv1.CapacityTypeLabelKey] = karpv1.CapacityTypeOnDemand
delete(n.Labels, cloudprovider.ReservationIDLabel)
if err := c.kubeClient.Patch(ctx, n, client.MergeFrom(stored)); client.IgnoreNotFound(err) != nil {
return false, serrors.Wrap(fmt.Errorf("patching node, %w", err), "Node", klog.KObj(n))
}
updated = true
}
return updated, nil
}