pkg/csi/controller.go (417 lines of code) (raw):
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package csi
import (
"context"
"fmt"
"strings"
"time"
"google.golang.org/api/compute/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/log"
)
const (
finalizerLabel = "node-cache.gke.io/in-use"
zoneLabel = "topology.gke.io/zone"
)
type volumeHandle struct {
project string
zone string
name string
}
type reconciler struct {
client.Client
Scheme *runtime.Scheme
k8sClient *kubernetes.Clientset
namespace string
volumeTypeConfigMap string
pdStorageClass string
attacher Attacher
}
type pvcReconciler struct {
*reconciler
}
type Attacher interface {
diskIsAttached(ctx context.Context, volume, nodeName string) (bool, error)
attachDisk(ctx context.Context, volume, nodeName string) error
}
type attacher struct {
k8sClient client.Client
computeSvc *compute.Service
}
var _ Attacher = &attacher{}
func NewAttacher(ctx context.Context, cfg *rest.Config) (Attacher, error) {
k8sClient, err := client.New(cfg, client.Options{Scheme: scheme.Scheme})
if err != nil {
return nil, err
}
svc, err := compute.NewService(ctx)
if err != nil {
return nil, err
}
return &attacher{k8sClient: k8sClient, computeSvc: svc}, nil
}
func ControllerInit() {
// This should get all core objects.
utilruntime.Must(scheme.AddToScheme(scheme.Scheme))
}
func NewManager(cfg *rest.Config, namespace, volumeTypeConfigMap string, attach Attacher, pdStorageClass string) (ctrl.Manager, error) {
mgr, err := ctrl.NewManager(cfg, ctrl.Options{
Scheme: scheme.Scheme,
Cache: cache.Options{
DefaultNamespaces: map[string]cache.Config{
namespace: {},
},
},
})
if err != nil {
return nil, fmt.Errorf("unable to create manager: %w", err)
}
k8sClient, err := kubernetes.NewForConfig(cfg)
if err != nil {
return nil, fmt.Errorf("unable to create k8s client: %w", err)
}
rec := &reconciler{
Client: mgr.GetClient(),
k8sClient: k8sClient,
Scheme: mgr.GetScheme(),
namespace: namespace,
volumeTypeConfigMap: volumeTypeConfigMap,
pdStorageClass: pdStorageClass,
attacher: attach,
}
if err := ctrl.NewControllerManagedBy(mgr).
Named("node").
Watches(&corev1.Node{}, &handler.EnqueueRequestForObject{}).
Complete(rec); err != nil {
return nil, err
}
if rec.attacher != nil {
if err := ctrl.NewControllerManagedBy(mgr).
Named("pvc").
Watches(&corev1.PersistentVolumeClaim{}, &handler.EnqueueRequestForObject{}).
Complete(&pvcReconciler{rec}); err != nil {
return nil, err
}
}
if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
return nil, fmt.Errorf("Unable to set up health check: %w", err)
}
if err := mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil {
return nil, fmt.Errorf("Unable to set up ready check: %w", err)
}
return mgr, nil
}
func (r *reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := log.FromContext(ctx)
var node corev1.Node
if err := r.Get(ctx, req.NamespacedName, &node); err != nil {
log.Error(err, "get node for reconcile", "node", req.NamespacedName.Name)
r.deleteOrphanedPDs(ctx)
return ctrl.Result{}, nil
}
if node.DeletionTimestamp != nil {
r.deleteOrphanedPDs(ctx)
// TODO: clean up old mappings?
return ctrl.Result{}, nil
}
mustCreateMapping := false
var mapping map[string]volumeTypeInfo
var configMap corev1.ConfigMap
err := r.Get(ctx, types.NamespacedName{Namespace: r.namespace, Name: r.volumeTypeConfigMap}, &configMap)
if apierrors.IsNotFound(err) {
mustCreateMapping = true
configMap.SetNamespace(r.namespace)
configMap.SetName(r.volumeTypeConfigMap)
mapping = map[string]volumeTypeInfo{}
} else if err == nil {
if mapping, err = getVolumeTypeMapping(configMap.Data); err != nil {
log.Error(err, "bad mapping (ignored, mapping recreated)")
mapping = map[string]volumeTypeInfo{}
}
} else {
log.Error(err, "get mapping", "mapping", fmt.Sprintf("%s/%s", r.namespace, r.volumeTypeConfigMap))
return ctrl.Result{}, nil
}
if configMap.Data == nil {
configMap.Data = map[string]string{}
}
info, err := getVolumeTypeFromNode(&node)
if err != nil && strings.Contains(err.Error(), "label not found on node") {
log.Info("skipping non-cache node", "node", node.GetName())
return ctrl.Result{}, nil
} else if err != nil {
return ctrl.Result{}, err
}
if info.VolumeType == pdVolumeType {
if r.pdStorageClass == "" {
return ctrl.Result{}, fmt.Errorf("No PD storage class has been defined, PD volumes can't be used")
}
if err := r.updatePdVolumeType(ctx, node.GetName(), &info); err != nil {
return ctrl.Result{}, err
}
}
mapping[node.GetName()] = info
if err := writeVolumeTypeMapping(configMap.Data, mapping); err != nil {
log.Error(err, "write mapping", "node", node.GetName())
return ctrl.Result{}, err
}
if mustCreateMapping {
if err := r.Create(ctx, &configMap); err != nil {
log.Error(err, "create configmap")
return ctrl.Result{}, err // requeue
}
} else {
if err := r.Update(ctx, &configMap); err != nil {
log.Error(err, "update configmap")
return ctrl.Result{}, err // requeue
}
}
log.Info("update", "node", node.GetName(), "info", info)
return ctrl.Result{}, nil
}
func (r *reconciler) updatePdVolumeType(ctx context.Context, node string, info *volumeTypeInfo) error {
if info.VolumeType != pdVolumeType {
return nil
}
if info.Size.IsZero() {
return fmt.Errorf("no size given for PD cache on node %s", node)
}
var pvc corev1.PersistentVolumeClaim
needCreate := false
err := r.Get(ctx, types.NamespacedName{Namespace: r.namespace, Name: node}, &pvc)
if apierrors.IsNotFound(err) {
needCreate = true
pvc.SetName(node)
pvc.SetNamespace(r.namespace)
pvc.Spec.StorageClassName = ptr.To(r.pdStorageClass)
pvc.Spec.VolumeMode = ptr.To(corev1.PersistentVolumeBlock)
pvc.Spec.AccessModes = []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce}
pvc.Spec.Resources.Requests = map[corev1.ResourceName]resource.Quantity{
corev1.ResourceStorage: info.Size,
}
} else if err == nil && pvc.Status.Phase == corev1.ClaimBound {
info.Disk = pvc.Spec.VolumeName
}
return r.updatePVCForLifecycle(ctx, &pvc, needCreate)
}
func (r *reconciler) updatePVCForLifecycle(ctx context.Context, pvc *corev1.PersistentVolumeClaim, needCreate bool) error {
found := false
for _, finalizer := range pvc.Finalizers {
if finalizer == finalizerLabel {
found = true
break
}
}
changed := false
if !found {
changed = true
pvc.Finalizers = append(pvc.Finalizers, finalizerLabel)
}
if needCreate {
if err := r.Create(ctx, pvc); err != nil {
return err
}
} else if changed {
if err := r.Update(ctx, pvc); err != nil {
return err
}
}
return nil
}
func (r *pvcReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := log.FromContext(ctx)
pvcName := req.NamespacedName.Name
var configMap corev1.ConfigMap
err := r.Get(ctx, types.NamespacedName{Namespace: r.namespace, Name: r.volumeTypeConfigMap}, &configMap)
if err != nil {
log.Info("PVC reconcile before mapping available", "pvc", pvcName, "error", err)
return ctrl.Result{Requeue: true}, nil
}
mapping, err := getVolumeTypeMapping(configMap.Data)
if err != nil {
return ctrl.Result{}, err
}
info, found := mapping[pvcName]
if !found {
return ctrl.Result{}, fmt.Errorf("Unknown node or pvc %s", pvcName)
}
var pvc corev1.PersistentVolumeClaim
if err := r.Get(ctx, req.NamespacedName, &pvc); err != nil {
return ctrl.Result{}, fmt.Errorf("reconciling %s: %w", req.NamespacedName, err)
}
var node corev1.Node
if err := r.Get(ctx, types.NamespacedName{Name: pvcName}, &node); err != nil {
if apierrors.IsNotFound(err) {
node.DeletionTimestamp = &metav1.Time{Time: time.Now()}
} else {
return ctrl.Result{}, fmt.Errorf("can't get node %s for pvc: %w", pvc.GetName(), err)
}
}
if node.DeletionTimestamp != nil {
// The node doesn't exist, the PVC should be deleted.
return ctrl.Result{}, r.deletePVC(ctx, &pvc)
}
mustRequeue := false
// Update the mapping with the PV name, if known.
if pvc.Status.Phase == corev1.ClaimBound && info.Disk != pvc.Spec.VolumeName {
if info.Disk != "" && info.Disk != pvc.Spec.VolumeName {
log.Error(nil, "pv mapping mismatch, will update", "old-disk", info.Disk, "curr-diisk", pvc.Spec.VolumeName)
}
info.Disk = pvc.Spec.VolumeName
mapping[pvcName] = info
if err := writeVolumeTypeMapping(configMap.Data, mapping); err != nil {
return ctrl.Result{}, err
}
if err := r.Update(ctx, &configMap); err != nil {
log.Error(err, "mapping update, will requeue")
mustRequeue = true
}
}
// If the PVC is bound but not attached, attach it.
if pvc.Status.Phase == corev1.ClaimBound {
var pv corev1.PersistentVolume
if err := r.Get(ctx, types.NamespacedName{Name: pvc.Spec.VolumeName}, &pv); err != nil {
return ctrl.Result{}, fmt.Errorf("Can't get volume for pvc %s: %w", pvc.GetName(), err)
}
attached, err := r.attacher.diskIsAttached(ctx, pv.Spec.CSI.VolumeHandle, node.GetName())
if err != nil {
return ctrl.Result{}, fmt.Errorf("Could not check attachment for pvc %s, pv %s: %w", pvc.GetName(), pv.GetName(), err)
}
if !attached {
if err := r.attacher.attachDisk(ctx, pv.Spec.CSI.VolumeHandle, node.GetName()); err != nil {
return ctrl.Result{}, fmt.Errorf("Could not attach pv %s to node %s: %w", pv.GetName(), pvc.GetName(), err)
}
log.Info("attach", "pvc", pvc.GetName())
}
}
// Otherwise everything looks good.
log.Info("reconciled, looks good", "pvc", req.NamespacedName)
return ctrl.Result{Requeue: mustRequeue}, nil
}
func (r *reconciler) deletePVC(ctx context.Context, pvc *corev1.PersistentVolumeClaim) error {
if err := r.Delete(ctx, pvc); err != nil {
return fmt.Errorf("Delete of pvc/%s failed: %w", pvc.GetName(), err)
}
changed := false
finalizers := []string{}
for _, f := range pvc.Finalizers {
if f == finalizerLabel {
changed = true
} else {
finalizers = append(finalizers, f)
}
}
if changed {
pvc.Finalizers = finalizers
return r.Update(ctx, pvc)
}
return nil
}
func (r *reconciler) deleteOrphanedPDs(ctx context.Context) error {
var pvcs corev1.PersistentVolumeClaimList
if err := r.List(ctx, &pvcs); err != nil {
return err
}
var nodes corev1.NodeList
if err := r.List(ctx, &nodes); err != nil {
return err
}
knownNodes := make(map[string]bool, len(nodes.Items))
for _, n := range nodes.Items {
if n.DeletionTimestamp == nil {
knownNodes[n.GetName()] = true
}
}
for _, pvc := range pvcs.Items {
if _, found := knownNodes[pvc.GetName()]; !found {
if err := r.deletePVC(ctx, &pvc); err != nil {
return err
}
}
}
return nil
}
func (a *attacher) diskIsAttached(ctx context.Context, volume, nodeName string) (bool, error) {
vol, err := parseVolumeHandle(volume)
if err != nil {
return false, err
}
var node corev1.Node
if err := a.k8sClient.Get(ctx, types.NamespacedName{Name: nodeName}, &node); err != nil {
return false, err
}
zone, found := node.GetLabels()[zoneLabel]
if !found {
return false, fmt.Errorf("No zone found for node %s", nodeName)
}
instance, err := a.computeSvc.Instances.Get(vol.project, zone, nodeName).Context(ctx).Do()
if err != nil {
return false, err
}
for _, disk := range instance.Disks {
if disk.DeviceName == vol.name {
return true, nil
}
}
return false, nil
}
func (a *attacher) attachDisk(ctx context.Context, volume, nodeName string) error {
vol, err := parseVolumeHandle(volume)
if err != nil {
return err
}
attach := &compute.AttachedDisk{
DeviceName: vol.name,
Source: sourceFromVolumeHandle(volume),
Mode: "READ_WRITE",
Type: "PERSISTENT",
}
op, err := a.computeSvc.Instances.AttachDisk(vol.project, vol.zone, nodeName, attach).Context(ctx).Do()
if err != nil {
return err
}
err = wait.PollUntilContextTimeout(ctx, 5*time.Second, 2*time.Minute, true, func(ctx context.Context) (bool, error) {
pollOp, err := a.computeSvc.ZoneOperations.Get(vol.project, vol.zone, op.Name).Context(ctx).Do()
if err != nil {
return false, err
}
if pollOp == nil || pollOp.Status != "DONE" {
return false, nil // retry
}
if pollOp.Error != nil {
errs := []string{}
for _, e := range pollOp.Error.Errors {
errs = append(errs, fmt.Sprintf("%v", e))
}
return false, fmt.Errorf("error waiting for attach to %s: %v", nodeName, errs)
}
return true, nil
})
if err != nil {
return fmt.Errorf("could not attach %s to %s: %w", volume, nodeName, err)
}
return nil
}
func parseVolumeHandle(volume string) (volumeHandle, error) {
// example handle: projects/mattcary-gke-dev3/zones/us-central1-b/disks/pvc-eeb37e7c-faa6-4287-9114-4ee7ca9f5d0a
parts := strings.Split(volume, "/")
if len(parts) != 6 {
return volumeHandle{}, fmt.Errorf("bad volume handle %s", volume)
}
return volumeHandle{
project: parts[1],
zone: parts[3],
name: parts[5],
}, nil
}
func sourceFromVolumeHandle(volume string) string {
return "https://www.googleapis.com/compute/v1/" + volume
}