pkg/nitro_enclaves_cpu_plugin/device_plugin.go (224 lines of code) (raw):

// Copyright 2025 Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 package nitro_enclaves_cpu_plugin import ( "errors" "fmt" "google.golang.org/grpc/credentials/insecure" "k8s-ne-device-plugin/pkg/config" "k8s-ne-device-plugin/pkg/nitro_enclaves_device_monitor" "net" "os" "path" "strconv" "strings" "time" "github.com/golang/glog" "golang.org/x/net/context" "google.golang.org/grpc" pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1" ) const ( deviceName = "nitro_enclaves_cpus" devicePluginServerReadyTimeout = 10 deviceOfflineCPUsPath = "/sys/devices/system/cpu/offline" ) var cpuIdCounter = 0 // NitroEnclavesCPUDevicePlugin implements the Kubernetes device plugin API type NitroEnclavesCPUDevicePlugin struct { devices []*pluginapi.Device stop chan interface{} server *grpc.Server pluginapi.DevicePluginServer nitro_enclaves_device_monitor.IBasicDevicePlugin } func (necdp *NitroEnclavesCPUDevicePlugin) socketPath() string { return pluginapi.DevicePluginPath + deviceName + ".sock" } func (necdp *NitroEnclavesCPUDevicePlugin) ResourceName() string { return "aws.ec2.nitro/" + deviceName } // determineAdvisableCPUs reads the number of offline cpus from /sys/devices/system/cpu/offline func determineAdvisableCPUs(data string) (int, error) { // Handle empty/unknown case content := strings.TrimSpace(data) if content == "" { return 0, nil } total := 0 ranges := strings.Split(content, ",") for _, r := range ranges { parts := strings.Split(r, "-") switch len(parts) { case 1: // Single CPU // Ensure that parts is a valid number _, err := strconv.Atoi(parts[0]) if err != nil { return 0, fmt.Errorf("invalid CPU number: %s, parsing caused error: %w", r, err) } total++ case 2: // CPU range start, err1 := strconv.Atoi(parts[0]) end, err2 := strconv.Atoi(parts[1]) if err1 != nil || err2 != nil { return 0, fmt.Errorf("invalid CPU range: %s", r) } total += end - start + 1 default: return 0, fmt.Errorf("malformed CPU range: %s", r) } } return total, nil } func generateEnclaveCPUID(deviceName string) string { ctr := cpuIdCounter cpuIdCounter++ return deviceName + "_" + strconv.Itoa(ctr) } // Register the device plugin with Kubelet. func (necdp *NitroEnclavesCPUDevicePlugin) register(kubeletEndpoint, resourceName string) error { glog.V(0).Info("Attempting to connect to kubelet...") ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() opts := []grpc.DialOption{ grpc.WithTransportCredentials(insecure.NewCredentials()), //lint:ignore SA1019 grpc.WithBlock is deprecated, not supported by grpc.NewClient grpc.WithBlock(), grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) { return net.DialUnix("unix", nil, &net.UnixAddr{Name: addr, Net: "unix"}) }), } //lint:ignore SA1019 grpc.DialContext is deprecated // todo replace by grpc.NewClient conn, err := grpc.DialContext( ctx, kubeletEndpoint, opts..., ) defer func(conn *grpc.ClientConn) { err := conn.Close() if err != nil { glog.Errorf("Error closing connection to kubelet: %v", err) } }(conn) if err != nil { glog.Errorf("Couldn't connect to kubelet! (Reason: %s)", err) return err } glog.V(0).Info("Connected to kubelet") client := pluginapi.NewRegistrationClient(conn) _, err = client.Register(ctx, &pluginapi.RegisterRequest{ Version: pluginapi.Version, Endpoint: path.Base(necdp.socketPath()), ResourceName: resourceName, }) return err } // Ensure that the gRPC server of the device plugin is ready to serve. func (necdp *NitroEnclavesCPUDevicePlugin) waitForServerReady(timeout int) error { for i := 0; i < timeout; i++ { info := necdp.server.GetServiceInfo() if len(info) >= 1 { return nil } time.Sleep(time.Second) } return errors.New("gRPC server initialization timed out") } // Allocate is called during container creation so that the Device // Plugin can run device specific operations and instruct Kubelet // of the steps to make the Device available in the container func (necdp *NitroEnclavesCPUDevicePlugin) Allocate(ctx context.Context, reqs *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) { responses := pluginapi.AllocateResponse{} for _, req := range reqs.ContainerRequests { responses.ContainerResponses = append(responses.ContainerResponses, &pluginapi.ContainerAllocateResponse{ Envs: map[string]string{ "NITRO_ENCLAVES_CPUS": strconv.Itoa(len(req.DevicesIDs)), }, }) } return &responses, nil } // GetDevicePluginOptions returns options to be communicated with Device Manager. func (*NitroEnclavesCPUDevicePlugin) GetDevicePluginOptions(context.Context, *pluginapi.Empty) (*pluginapi.DevicePluginOptions, error) { return &pluginapi.DevicePluginOptions{}, nil } // GetPreferredAllocation returns a preferred set of devices to allocate // from a list of available ones. The resulting preferred allocation is not // guaranteed to be the allocation ultimately performed by the // devicemanager. It is only designed to help the devicemanager make a more // informed allocation decision when possible. func (*NitroEnclavesCPUDevicePlugin) GetPreferredAllocation(context.Context, *pluginapi.PreferredAllocationRequest) (*pluginapi.PreferredAllocationResponse, error) { return &pluginapi.PreferredAllocationResponse{}, nil } // ListAndWatch returns a stream of List of Devices // Whenever a Device state change or a Device disappears, ListAndWatch // returns the new list func (necdp *NitroEnclavesCPUDevicePlugin) ListAndWatch(e *pluginapi.Empty, s pluginapi.DevicePlugin_ListAndWatchServer) error { err := s.Send(&pluginapi.ListAndWatchResponse{Devices: necdp.devices}) if err != nil { return err } <-necdp.stop return nil } // PreStartContainer is called, if indicated by Device Plugin during registration phase, // before each container start. Device plugin can run device specific operations // such as resetting the device before making devices available to the container. func (necdp *NitroEnclavesCPUDevicePlugin) PreStartContainer(context.Context, *pluginapi.PreStartContainerRequest) (*pluginapi.PreStartContainerResponse, error) { return &pluginapi.PreStartContainerResponse{}, nil } func (necdp *NitroEnclavesCPUDevicePlugin) releaseResources() { necdp.server = nil // check if socketPath does exist and delete otherwise do nothing _, err := os.Stat(necdp.socketPath()) if err == nil { err = os.Remove(necdp.socketPath()) if err != nil { glog.Errorf("Error removing socket file: %s", err) } } } // Start device plugin server func (necdp *NitroEnclavesCPUDevicePlugin) Start() error { necdp.releaseResources() glog.V(0).Info("Starting Nitro Enclaves CPU device plugin server...") sock, err := net.Listen("unix", necdp.socketPath()) if err != nil { glog.Error("Error while creating socket: ", necdp.socketPath()) return err } necdp.server = grpc.NewServer([]grpc.ServerOption{}...) pluginapi.RegisterDevicePluginServer(necdp.server, necdp) go func() { err := necdp.server.Serve(sock) if err != nil { if necdp.stop != nil { glog.Errorf("Error while serving device plugin: %v", err) close(necdp.stop) } } }() err = necdp.waitForServerReady(devicePluginServerReadyTimeout) if err != nil { return err } if err = necdp.register(pluginapi.KubeletSocket, necdp.ResourceName()); err != nil { glog.Errorf("Error while registering cpu device plugin with kubelet! (Reason: %s)", err) necdp.Stop() return err } glog.V(0).Info("Registered cpu device plugin with Kubelet: ", necdp.ResourceName()) return nil } // Stop device plugin server func (necdp *NitroEnclavesCPUDevicePlugin) Stop() { close(necdp.stop) if necdp.server != nil { necdp.server.Stop() necdp.releaseResources() necdp.server = nil } glog.V(0).Infof("CPU device plugin stopped. (Socket: %s)", necdp.socketPath()) } // NewNitroEnclavesCPUDevicePlugin returns an initialized NitroEnclavesCPUDevicePlugin func NewNitroEnclavesCPUDevicePlugin(config *config.PluginConfig) *NitroEnclavesCPUDevicePlugin { if err := config.Validate(); err != nil { glog.Errorf("invalid CPU plugin config: %v", err) } glog.V(0).Infof("Initializing Nitro Enclaves CPU device plugin with following params: %v", config) // create a virtual device for each 'offline' cpu on the kubernetes worker. An offline CPU can be considered a // CPU that is not in use by the host OS and has thus been allocated by the AWS Nitro Enclave allocation service. var devs []*pluginapi.Device if config.EnclaveCPUAdvertisement { data, err := os.ReadFile(deviceOfflineCPUsPath) if err != nil { glog.V(0).Infof("Error reading offline CPU file: %v", err) // if error was thrown in read CPU file step, set data to empty string to have // determineAdvisableCPUs set availableCPUsOnInstance to 0 data = []byte("") } availableCPUsOnInstance, err := determineAdvisableCPUs(string(data)) if err != nil { glog.V(0).Infof("Error while determining advisable CPUs on the instance: %v", err) availableCPUsOnInstance = 0 } for i := 0; i < availableCPUsOnInstance; i++ { devs = append(devs, &pluginapi.Device{ ID: generateEnclaveCPUID(deviceName), Health: pluginapi.Healthy, }) } glog.V(0).Infof("Reserved CPUs for encalves added: %v", availableCPUsOnInstance) } return &NitroEnclavesCPUDevicePlugin{ devices: devs, stop: make(chan interface{}), } }