pkg/csi/cache.go (140 lines of code) (raw):

// Copyright 2024 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package csi import ( "context" "fmt" "slices" "strings" "time" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" "github.com/GoogleCloudPlatform/csi-node-cache/pkg/common" "github.com/GoogleCloudPlatform/csi-node-cache/pkg/localvolume" ) const ( tmpfsPath = "/local/tmpfs" lssdDevice = "/dev/md/lssd" lssdPath = "/local/lssd" pdPath = "/local/pd" volumeTypeInfoKey = "volume-types" pdVolumeType = "pd" ) type volumeTypeInfo struct { VolumeType string Size resource.Quantity Disk string } // createCacheVolume creates a volume by looking for the node in the volume type // map and returning the appropriate local volume. func createCacheVolume(ctx context.Context, client *kubernetes.Clientset, nodeName string, volumeTypeMapName types.NamespacedName) (localvolume.LocalVolume, error) { var volumeTypeMap *corev1.ConfigMap if err := wait.PollUntilContextTimeout(ctx, 500*time.Millisecond, 1*time.Minute, true, func(ctx context.Context) (bool, error) { var err error volumeTypeMap, err = client.CoreV1().ConfigMaps(volumeTypeMapName.Namespace).Get(ctx, volumeTypeMapName.Name, metav1.GetOptions{}) if err != nil { klog.Errorf("Failed to get volume type map, retrying: %v", err) return false, nil // retry } return true, nil }); err != nil { return nil, common.NewVolumePendingError(fmt.Errorf("no node cache volume type found: %w", err)) } types, err := getVolumeTypeMapping(volumeTypeMap.Data) if err != nil { // An error means a badly formed configmap, which is terminal (not a NewVolumePendingError). return nil, err } info, found := types[nodeName] if !found { // An unknown type is terminal. return nil, common.NewVolumePendingError(fmt.Errorf("No volume type information for %s found in %s/%s", nodeName, volumeTypeMapName.Namespace, volumeTypeMapName.Name)) } var vol localvolume.LocalVolume switch info.VolumeType { case "tmpfs": vol, err = localvolume.NewTmpfsVolume(ctx, tmpfsPath, info.Size) case "lssd": vol, err = localvolume.NewLocalSSDVolume(lssdDevice, lssdPath) case "pd": vol, err = localvolume.NewPDVolume(info.Disk, pdPath) default: err = fmt.Errorf("Unknown volume type from type info %v", info) } return vol, err } func getVolumeTypeMapping(configMapData map[string]string) (map[string]volumeTypeInfo, error) { nodes, found := configMapData[volumeTypeInfoKey] if !found { return nil, fmt.Errorf("%s not found in volume type config map", volumeTypeInfoKey) } typeMap := map[string]volumeTypeInfo{} for _, line := range strings.Split(nodes, "\n") { line = strings.TrimSpace(line) if line == "" { continue } items := strings.Split(line, ",") if len(items) < 2 { return nil, fmt.Errorf("Bad line in volume type config map: %s", line) } node := strings.TrimSpace(items[0]) if _, found := typeMap[node]; found { return nil, fmt.Errorf("node %s duplicated in volume type config map: %s", node, line) } var info volumeTypeInfo for _, item := range items[1:] { parts := strings.SplitN(item, "=", 2) trimmed := strings.TrimSpace(parts[0]) switch trimmed { case "type": info.VolumeType = strings.TrimSpace(parts[1]) case "size": szStr := strings.TrimSpace(parts[1]) q, err := resource.ParseQuantity(szStr) if err != nil { return nil, fmt.Errorf("bad size in volume type config map: %s", line) } info.Size = q case "disk": info.Disk = strings.TrimSpace(parts[1]) default: return nil, fmt.Errorf("bad key %s in volume type config map: %s", trimmed, line) } } typeMap[node] = info } return typeMap, nil } func writeVolumeTypeMapping(configMapData map[string]string, typeMap map[string]volumeTypeInfo) error { lines := make([]string, 0, len(typeMap)) for node, info := range typeMap { line := fmt.Sprintf("%s,type=%s", node, info.VolumeType) if !info.Size.IsZero() { line += fmt.Sprintf(",size=%s", info.Size.String()) } if info.Disk != "" { line += fmt.Sprintf(",disk=%s", info.Disk) } lines = append(lines, line) } slices.Sort(lines) configMapData[volumeTypeInfoKey] = strings.Join(lines, "\n") return nil } func getVolumeTypeFromNode(node *corev1.Node) (volumeTypeInfo, error) { labels := node.GetLabels() volumeType, found := labels[common.VolumeTypeLabel] if !found { return volumeTypeInfo{}, fmt.Errorf("%s label not found on node %s", common.VolumeTypeLabel, node.GetName()) } vti := volumeTypeInfo{VolumeType: volumeType} szStr, found := labels[common.SizeLabel] if found { q, err := resource.ParseQuantity(szStr) if err != nil { return volumeTypeInfo{}, fmt.Errorf("bad size label %s=%s on %s", common.SizeLabel, szStr, node.GetName()) } vti.Size = q } return vti, nil }