pkg/csi/driver.go (88 lines of code) (raw):
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package csi
import (
"context"
"fmt"
"net"
"net/url"
"os"
"path/filepath"
"github.com/container-storage-interface/spec/lib/go/csi"
"google.golang.org/grpc"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
"github.com/GoogleCloudPlatform/csi-node-cache/pkg/localvolume"
)
type VolumeCreatorFunc func() (localvolume.LocalVolume, error)
// Driver is the object backing the CSI driver. It also implements identity and node services, q.v.
type Driver struct {
client *kubernetes.Clientset
endpoint string
vol localvolume.LocalVolume
nodeId string
volumeTypeMap types.NamespacedName
driverName string
driverVersion string
}
var _ csi.IdentityServer = &Driver{}
var _ csi.NodeServer = &Driver{}
// NewDriver creates a new local volume CSI driver using the given LocalVolumeCreator.
// endpoint is the csi socket, and nodeId is the id to use for csi registration.
func NewDriver(client *kubernetes.Clientset, endpoint, nodeId string, volumeTypeMap types.NamespacedName, driverName, driverVersion string) (*Driver, error) {
klog.V(4).Infof("Driver: %v version: %v running on %s", driverName, driverVersion, nodeId)
d := &Driver{
client: client,
endpoint: endpoint,
nodeId: nodeId,
volumeTypeMap: volumeTypeMap,
driverName: driverName,
driverVersion: driverVersion,
}
return d, nil
}
// Run will serve the CSI driver. Normally this will run forever; an error will be returned otherwise.
func (d *Driver) Run() error {
opts := []grpc.ServerOption{
grpc.UnaryInterceptor(logGRPC),
}
u, err := url.Parse(d.endpoint)
if err != nil {
return fmt.Errorf("cannot parse endpoint %s: %w", d.endpoint, err)
}
var addr string
if u.Scheme == "unix" {
addr = u.Path
if err := os.Remove(addr); err != nil && !os.IsNotExist(err) {
return fmt.Errorf("failed to remove %s: %w", addr, err)
}
listenDir := filepath.Dir(addr)
if _, err := os.Stat(listenDir); err != nil {
if os.IsNotExist(err) {
return fmt.Errorf("expected Kubelet plugin watcher to create parent dir %s but did not find such a dir", listenDir)
} else {
return fmt.Errorf("failed to stat %s: %w", listenDir, err)
}
}
} else if u.Scheme == "tcp" {
addr = u.Host
} else {
return fmt.Errorf("%v endpoint scheme not supported", u.Scheme)
}
listener, err := net.Listen(u.Scheme, addr)
if err != nil {
return fmt.Errorf("failed to listen: %w", err)
}
server := grpc.NewServer(opts...)
csi.RegisterIdentityServer(server, d)
csi.RegisterNodeServer(server, d)
if err := server.Serve(listener); err != nil {
return fmt.Errorf("serving failed: %w", err)
}
return nil
}
func logGRPC(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
klog.V(4).Infof("%s called with request: %+v", info.FullMethod, req)
resp, err := handler(ctx, req)
if err != nil {
klog.Errorf("%s returned with error: %v", info.FullMethod, err)
} else {
klog.V(4).Infof("%s returned with response: %+v", info.FullMethod, resp)
}
return resp, err
}