pkg/nitro_enclaves_device_plugin/device_plugin.go (200 lines of code) (raw):
// Copyright 2022 Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package nitro_enclaves_device_plugin
import (
"errors"
"google.golang.org/grpc/credentials/insecure"
"k8s-ne-device-plugin/pkg/config"
"k8s-ne-device-plugin/pkg/nitro_enclaves_device_monitor"
"net"
"os"
"path"
"strconv"
"time"
"github.com/golang/glog"
"golang.org/x/net/context"
"google.golang.org/grpc"
pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
)
const (
deviceName = "nitro_enclaves"
devicePluginServerReadyTimeout = 10
)
var deviceIdCounter = 0
type IPluginDefinitions interface {
socketPath() string
devicePath() string
}
type NEPluginDefinitions struct {
IPluginDefinitions
}
func (n *NEPluginDefinitions) socketPath() string {
return pluginapi.DevicePluginPath + deviceName + ".sock"
}
func (n *NEPluginDefinitions) devicePath() string {
return "/dev/" + deviceName
}
func (nedp *NitroEnclavesDevicePlugin) ResourceName() string {
return "aws.ec2.nitro/" + deviceName
}
// NitroEnclavesDevicePlugin implements the Kubernetes device plugin API
type NitroEnclavesDevicePlugin struct {
dev []*pluginapi.Device
pdef IPluginDefinitions
stop chan interface{}
health chan *pluginapi.Device
server *grpc.Server
pluginapi.DevicePluginServer
nitro_enclaves_device_monitor.IBasicDevicePlugin
}
func generateDeviceID(deviceName string) string {
ctr := deviceIdCounter
deviceIdCounter++
return deviceName + "_" + strconv.Itoa(ctr)
}
func (nedp *NitroEnclavesDevicePlugin) releaseResources() {
nedp.server = nil
// check if socketPath does exist and delete otherwise do nothing
_, err := os.Stat(nedp.pdef.socketPath())
if err == nil {
err = os.Remove(nedp.pdef.socketPath())
if err != nil {
glog.Errorf("Error removing socket file: %s", err)
}
}
}
// Register the device plugin with Kubelet.
func (nedp *NitroEnclavesDevicePlugin) register(kubeletEndpoint, resourceName string) error {
glog.V(0).Infof("Attempting %v device plugin to connect to kubelet...", nedp.ResourceName())
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
opts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
//lint:ignore SA1019 grpc.WithBlock is deprecated, not supported by grpc.NewClient
grpc.WithBlock(),
grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) {
return net.DialUnix("unix", nil, &net.UnixAddr{Name: addr, Net: "unix"})
}),
}
//lint:ignore SA1019 grpc.DialContext is deprecated // todo replace by grpc.NewClient
conn, err := grpc.DialContext(
ctx,
kubeletEndpoint,
opts...,
)
defer func(conn *grpc.ClientConn) {
err := conn.Close()
if err != nil {
glog.Errorf("Error closing connection to kubelet: %s", err)
}
}(conn)
if err != nil {
glog.Errorf("Couldn't connect to kubelet! (Reason: %s)", err)
return err
}
glog.V(0).Infof("Connected %v device plugin to kubelet", nedp.ResourceName())
client := pluginapi.NewRegistrationClient(conn)
_, err = client.Register(ctx, &pluginapi.RegisterRequest{
Version: pluginapi.Version,
Endpoint: path.Base(nedp.pdef.socketPath()),
ResourceName: resourceName,
})
return err
}
// Ensure that the gRPC server of the device plugin is ready to serve.
func (nedp *NitroEnclavesDevicePlugin) waitForServerReady(timeout int) error {
for i := 0; i < timeout; i++ {
info := nedp.server.GetServiceInfo()
if len(info) >= 1 {
return nil
}
time.Sleep(time.Second)
}
return errors.New("gRPC server initialization timed out")
}
// Allocate is called during container creation so that the Device
// Plugin can run device specific operations and instruct Kubelet
// of the steps to make the Device available in the container
func (nedp *NitroEnclavesDevicePlugin) Allocate(ctx context.Context, reqs *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) {
responses := pluginapi.AllocateResponse{}
for _, req := range reqs.ContainerRequests {
response := pluginapi.ContainerAllocateResponse{
Devices: []*pluginapi.DeviceSpec{
{
ContainerPath: nedp.pdef.devicePath(),
HostPath: nedp.pdef.devicePath(),
Permissions: "rw",
},
},
}
for _, id := range req.DevicesIDs {
glog.V(1).Info("Allocation request for device ID: ", id)
}
responses.ContainerResponses = append(responses.ContainerResponses, &response)
}
return &responses, nil
}
// GetDevicePluginOptions returns options to be communicated with Device Manager.
func (nedp *NitroEnclavesDevicePlugin) GetDevicePluginOptions(context.Context, *pluginapi.Empty) (*pluginapi.DevicePluginOptions, error) {
return &pluginapi.DevicePluginOptions{}, nil
}
// GetPreferredAllocation returns a preferred set of devices to allocate
// from a list of available ones. The resulting preferred allocation is not
// guaranteed to be the allocation ultimately performed by the
// devicemanager. It is only designed to help the devicemanager make a more
// informed allocation decision when possible.
func (*NitroEnclavesDevicePlugin) GetPreferredAllocation(context.Context, *pluginapi.PreferredAllocationRequest) (*pluginapi.PreferredAllocationResponse, error) {
return &pluginapi.PreferredAllocationResponse{}, nil
}
// ListAndWatch returns a stream of List of Devices
// Whenever a Device state change or a Device disappears, ListAndWatch
// returns the new list
func (nedp *NitroEnclavesDevicePlugin) ListAndWatch(e *pluginapi.Empty, s pluginapi.DevicePlugin_ListAndWatchServer) error {
err := s.Send(&pluginapi.ListAndWatchResponse{Devices: nedp.dev})
if err != nil {
return err
}
//TODO: Device health check goes here
<-nedp.stop
return nil
}
// PreStartContainer is called, if indicated by Device Plugin during registration phase,
// before each container start. Device plugin can run device specific operations
// such as resetting the device before making devices available to the container.
func (nedp *NitroEnclavesDevicePlugin) PreStartContainer(context.Context, *pluginapi.PreStartContainerRequest) (*pluginapi.PreStartContainerResponse, error) {
return &pluginapi.PreStartContainerResponse{}, nil
}
// Start device plugin server
func (nedp *NitroEnclavesDevicePlugin) Start() error {
nedp.releaseResources()
glog.V(0).Info("Starting Nitro Enclaves device plugin server...")
sock, err := net.Listen("unix", nedp.pdef.socketPath())
if err != nil {
glog.Errorf("Error while creating socket: %v", nedp.pdef.socketPath())
return err
}
nedp.server = grpc.NewServer([]grpc.ServerOption{}...)
pluginapi.RegisterDevicePluginServer(nedp.server, nedp)
go func() {
err := nedp.server.Serve(sock)
if err != nil {
if nedp.stop != nil {
glog.Errorf("Error while serving device plugin: %v", err)
close(nedp.stop)
}
}
}()
err = nedp.waitForServerReady(devicePluginServerReadyTimeout)
if err != nil {
return err
}
if err = nedp.register(pluginapi.KubeletSocket, nedp.ResourceName()); err != nil {
glog.Errorf("Error while registering device plugin with kubelet! (Reason: %s)", err)
nedp.Stop()
return err
}
glog.V(0).Infof("Registered device plugin with Kubelet: %v", nedp.ResourceName())
return nil
}
// Stop device plugin server
func (nedp *NitroEnclavesDevicePlugin) Stop() {
if nedp.server != nil {
nedp.server.Stop()
nedp.releaseResources()
glog.V(0).Infof("Device plugin stopped. (Socket: %s)", nedp.pdef.socketPath())
}
}
// NewNitroEnclavesDevicePlugin returns an initialized NitroEnclavesDevicePlugin
func NewNitroEnclavesDevicePlugin(config *config.PluginConfig) *NitroEnclavesDevicePlugin {
if err := config.Validate(); err != nil {
glog.Errorf("invalid plugin config: %v", err)
}
glog.V(0).Infof("Initializing Nitro Enclaves device plugin with following params: %v", config)
// devs slice, determines the pluginapi.ListAndWatchResponse, which lets the kubelet know about the available/allocatable "aws.ec2.nitro/nitro_enclaves" devices
// in a k8s worker node. Number of devices, in this context does not represent number of "nitro_enclaves" device files present in the host,
// instead it can be interpreted as number pods that can share the same host device file. The same host device file "nitro_enclaves",
// can be mounted into multiple pods, which can be used to run an enclave.
// This lets us schedule 2 or more pods requiring nitro_enclaves device on the same k8s node/EC2 instance.
devs := []*pluginapi.Device{}
for i := 0; i < config.MaxEnclavesPerNode; i++ {
devs = append(devs, &pluginapi.Device{
ID: generateDeviceID(deviceName),
Health: pluginapi.Healthy,
})
}
glog.V(0).Infof("Enclave devices added: %v", config.MaxEnclavesPerNode)
return &NitroEnclavesDevicePlugin{
dev: devs,
pdef: &NEPluginDefinitions{},
stop: make(chan interface{}),
health: make(chan *pluginapi.Device),
}
}