pkg/gpu/nvidia/manager.go (436 lines of code) (raw):
// Copyright 2017 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package nvidia
import (
"bytes"
"fmt"
"io"
"io/ioutil"
"net"
"os"
"os/exec"
"path"
"regexp"
"strconv"
"strings"
"sync"
"time"
"github.com/GoogleCloudPlatform/container-engine-accelerators/pkg/gpu/nvidia/nvmlutil"
"github.com/GoogleCloudPlatform/container-engine-accelerators/pkg/gpu/nvidia/util"
"github.com/NVIDIA/go-nvml/pkg/nvml"
"github.com/fsnotify/fsnotify"
"github.com/golang/glog"
"google.golang.org/grpc"
pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
"github.com/GoogleCloudPlatform/container-engine-accelerators/pkg/gpu/nvidia/gpusharing"
"github.com/GoogleCloudPlatform/container-engine-accelerators/pkg/gpu/nvidia/mig"
)
const (
// All NVIDIA GPUs cards should be mounted with nvidiactl and nvidia-uvm
// If the driver installed correctly, these two devices will be there.
nvidiaCtlDevice = "nvidiactl"
nvidiaUVMDevice = "nvidia-uvm"
// Optional devices.
nvidiaUVMToolsDevice = "nvidia-uvm-tools"
nvidiaModesetDevice = "nvidia-modeset"
nvidiaDeviceRE = `^nvidia[0-9]*$`
gpuCheckInterval = 10 * time.Second
pluginSocketCheckInterval = 1 * time.Second
nvidiaMpsDir = "/tmp/nvidia-mps"
mpsControlBin = "/usr/local/nvidia/bin/nvidia-cuda-mps-control"
mpsActiveThreadCmd = "get_default_active_thread_percentage"
mpsMemLimitEnv = "CUDA_MPS_PINNED_DEVICE_MEM_LIMIT"
mpsThreadLimitEnv = "CUDA_MPS_ACTIVE_THREAD_PERCENTAGE"
)
var (
resourceName = "nvidia.com/gpu"
pciDevicesRoot = "/sys/bus/pci/devices"
)
// GPUConfig stores the settings used to configure the GPUs on a node.
type GPUConfig struct {
GPUPartitionSize string
// MaxTimeSharedClientsPerGPU is the number of the time-shared GPU resources to expose for each physical GPU.
// Deprecated in favor of GPUSharingConfig.
MaxTimeSharedClientsPerGPU int
// GPUSharingConfig informs how GPUs on this node can be shared between containers.
GPUSharingConfig GPUSharingConfig
// Xid error codes that will set the node to unhealthy
HealthCriticalXid []int
}
type GPUSharingConfig struct {
// GPUSharingStrategy is the type of sharing strategy to enable on this node. Values are "time-sharing" or "mps".
GPUSharingStrategy gpusharing.GPUSharingStrategy
// MaxSharedClientsPerGPU is the maximum number of clients that are allowed to share a single GPU.
MaxSharedClientsPerGPU int
}
func (config *GPUConfig) AddDefaultsAndValidate() error {
if config.MaxTimeSharedClientsPerGPU > 0 {
if config.GPUSharingConfig.GPUSharingStrategy != "" || config.GPUSharingConfig.MaxSharedClientsPerGPU > 0 {
glog.Infof("Both MaxTimeSharedClientsPerGPU and GPUSharingConfig are set, use the value of MaxTimeSharedClientsPerGPU")
}
config.GPUSharingConfig.GPUSharingStrategy = gpusharing.TimeSharing
config.GPUSharingConfig.MaxSharedClientsPerGPU = config.MaxTimeSharedClientsPerGPU
} else {
switch config.GPUSharingConfig.GPUSharingStrategy {
case gpusharing.TimeSharing, gpusharing.MPS:
if config.GPUSharingConfig.MaxSharedClientsPerGPU <= 0 {
return fmt.Errorf("MaxSharedClientsPerGPU should be > 0 for time-sharing or mps GPU sharing strategies")
}
break
case gpusharing.Undefined:
if config.GPUSharingConfig.MaxSharedClientsPerGPU > 0 {
return fmt.Errorf("GPU sharing strategy needs to be specified when MaxSharedClientsPerGPU > 0")
}
default:
return fmt.Errorf("invalid GPU Sharing strategy: %v, should be one of time-sharing or mps", config.GPUSharingConfig.GPUSharingStrategy)
}
}
gpusharing.SharingStrategy = config.GPUSharingConfig.GPUSharingStrategy
return nil
}
func (config *GPUConfig) AddHealthCriticalXid() error {
xidConfig := os.Getenv("XID_CONFIG")
if len(xidConfig) == 0 {
glog.Infof("There is no Xid config specified ")
return nil
}
glog.Infof("Detect HealthCriticalXid : %s ", xidConfig)
xidStrs := strings.Split(xidConfig, ",")
xidArry := make([]int, len(xidStrs))
var err error
for i := range xidArry {
xidStr := strings.TrimSpace(xidStrs[i])
xidArry[i], err = strconv.Atoi(xidStr)
if err != nil {
return fmt.Errorf("Invalid HealthCriticalXid input : %v", err)
}
}
config.HealthCriticalXid = xidArry
return nil
}
// nvidiaGPUManager manages nvidia gpu devices.
type nvidiaGPUManager struct {
devDirectory string
mountPaths []pluginapi.Mount
defaultDevices []string
devices map[string]pluginapi.Device
grpcServer *grpc.Server
socket string
stop chan bool
devicesMutex sync.Mutex
nvidiaCtlDevicePath string
nvidiaUVMDevicePath string
gpuConfig GPUConfig
migDeviceManager mig.DeviceManager
Health chan pluginapi.Device
totalMemPerGPU uint64 // Total memory available per GPU (in MB)
}
func NewNvidiaGPUManager(devDirectory, procDirectory string, mountPaths []pluginapi.Mount, gpuConfig GPUConfig) *nvidiaGPUManager {
return &nvidiaGPUManager{
devDirectory: devDirectory,
mountPaths: mountPaths,
devices: make(map[string]pluginapi.Device),
stop: make(chan bool),
nvidiaCtlDevicePath: path.Join(devDirectory, nvidiaCtlDevice),
nvidiaUVMDevicePath: path.Join(devDirectory, nvidiaUVMDevice),
gpuConfig: gpuConfig,
migDeviceManager: mig.NewDeviceManager(devDirectory, procDirectory),
Health: make(chan pluginapi.Device),
}
}
// ListPhysicalDevices lists all physical GPU devices (including partitions) available on this node.
func (ngm *nvidiaGPUManager) ListPhysicalDevices() map[string]pluginapi.Device {
if ngm.gpuConfig.GPUPartitionSize == "" {
return ngm.devices
}
return ngm.migDeviceManager.ListGPUPartitionDevices()
}
func (ngm *nvidiaGPUManager) ListHealthCriticalXid() []int {
return ngm.gpuConfig.HealthCriticalXid
}
// ListDevices lists all GPU devices available on this node.
func (ngm *nvidiaGPUManager) ListDevices() map[string]pluginapi.Device {
physicalGPUDevices := ngm.ListPhysicalDevices()
switch {
case ngm.gpuConfig.GPUSharingConfig.MaxSharedClientsPerGPU > 0:
virtualGPUDevices := map[string]pluginapi.Device{}
for _, device := range physicalGPUDevices {
for i := 0; i < ngm.gpuConfig.GPUSharingConfig.MaxSharedClientsPerGPU; i++ {
virtualDeviceID := fmt.Sprintf("%s/vgpu%d", device.ID, i)
// When sharing GPUs, the virtual GPU device will inherit the health status from its underlying physical GPU device.
virtualGPUDevices[virtualDeviceID] = pluginapi.Device{ID: virtualDeviceID, Health: device.Health, Topology: device.Topology}
}
}
return virtualGPUDevices
default:
return physicalGPUDevices
}
}
// DeviceSpec returns the device spec that inclues list of devices to allocate for a deviceID.
func (ngm *nvidiaGPUManager) DeviceSpec(deviceID string) ([]pluginapi.DeviceSpec, error) {
deviceSpecs := make([]pluginapi.DeviceSpec, 0)
// With GPU sharing, the input deviceID will be a virtual Device ID.
// We need to map it to the corresponding physical device ID.
if ngm.gpuConfig.GPUSharingConfig.MaxSharedClientsPerGPU > 0 {
physicalDeviceID, err := gpusharing.VirtualToPhysicalDeviceID(deviceID)
if err != nil {
return nil, err
}
deviceID = physicalDeviceID
}
if ngm.gpuConfig.GPUPartitionSize == "" {
dev, ok := ngm.devices[deviceID]
if !ok {
return deviceSpecs, fmt.Errorf("invalid allocation request with non-existing device %s", deviceID)
}
if dev.Health != pluginapi.Healthy {
return deviceSpecs, fmt.Errorf("invalid allocation request with unhealthy device %s", deviceID)
}
deviceSpecs = append(deviceSpecs, pluginapi.DeviceSpec{
HostPath: path.Join(ngm.devDirectory, deviceID),
ContainerPath: path.Join(ngm.devDirectory, deviceID),
Permissions: "mrw",
})
return deviceSpecs, nil
}
return ngm.migDeviceManager.DeviceSpec(deviceID)
}
// Discovers all NVIDIA GPU devices available on the local node by walking nvidiaGPUManager's devDirectory.
func (ngm *nvidiaGPUManager) discoverGPUs() error {
if nvmlutil.NvmlDeviceInfo == nil {
nvmlutil.NvmlDeviceInfo = &nvmlutil.DeviceInfo{}
}
devicesCount, ret := nvmlutil.NvmlDeviceInfo.DeviceCount()
if ret != nvml.SUCCESS {
return fmt.Errorf("failed to get devices count: %v", nvml.ErrorString(ret))
}
for i := 0; i < devicesCount; i++ {
device, ret := nvmlutil.NvmlDeviceInfo.DeviceHandleByIndex((i))
if ret != nvml.SUCCESS {
return fmt.Errorf("failed to get the device handle for index %d: %v", i, nvml.ErrorString(ret))
}
minor, ret := nvmlutil.NvmlDeviceInfo.MinorNumber(device)
if ret != nvml.SUCCESS {
return fmt.Errorf("failed to get the minor number for device with index %d: %v", i, nvml.ErrorString(ret))
}
path := fmt.Sprintf("nvidia%d", minor)
glog.V(3).Infof("Found Nvidia GPU %q\n", path)
topologyInfo, err := nvmlutil.Topology(device, pciDevicesRoot)
if err != nil {
glog.Errorf("unable to get topology for device with index %d", i, err)
}
ngm.SetDeviceHealth(path, pluginapi.Healthy, topologyInfo)
}
return nil
}
func (ngm *nvidiaGPUManager) hasAdditionalGPUsInstalled() bool {
ngm.devicesMutex.Lock()
originalDeviceCount := len(ngm.devices)
ngm.devicesMutex.Unlock()
deviceCount, err := ngm.discoverNumGPUs()
if err != nil {
glog.Errorln(err)
return false
}
if deviceCount > originalDeviceCount {
glog.Infof("Found %v GPUs, while only %v are registered. Stopping device-plugin server.", deviceCount, originalDeviceCount)
return true
}
return false
}
func (ngm *nvidiaGPUManager) discoverNumGPUs() (int, error) {
reg := regexp.MustCompile(nvidiaDeviceRE)
deviceCount := 0
files, err := ioutil.ReadDir(ngm.devDirectory)
if err != nil {
return deviceCount, err
}
for _, f := range files {
if f.IsDir() {
continue
}
if reg.MatchString(f.Name()) {
deviceCount++
}
}
return deviceCount, nil
}
// isMpsHealthy checks whether MPS control daemon is running and healhty on the node.
func (ngm *nvidiaGPUManager) isMpsHealthy() error {
var out bytes.Buffer
reader, writer := io.Pipe()
defer writer.Close()
defer reader.Close()
mpsCmd := exec.Command(mpsControlBin)
mpsCmd.Stdin = reader
mpsCmd.Stdout = &out
err := mpsCmd.Start()
if err != nil {
return fmt.Errorf("failed to start NVIDIA MPS health check command: %v", err)
}
writer.Write([]byte(mpsActiveThreadCmd))
writer.Close()
err = mpsCmd.Wait()
if err != nil {
return fmt.Errorf("failed to health check NVIDIA MPS: %v", err)
}
reader.Close()
glog.Infof("MPS is healthy, active thread percentage = %s", out.String())
return nil
}
func (ngm *nvidiaGPUManager) Envs(numDevicesRequested int) map[string]string {
if ngm.gpuConfig.GPUSharingConfig.GPUSharingStrategy == gpusharing.MPS {
activeThreadLimit := numDevicesRequested * 100 / ngm.gpuConfig.GPUSharingConfig.MaxSharedClientsPerGPU
memoryLimitBytes := uint64(numDevicesRequested) * ngm.totalMemPerGPU / uint64(ngm.gpuConfig.GPUSharingConfig.MaxSharedClientsPerGPU)
return map[string]string{
mpsThreadLimitEnv: strconv.Itoa(activeThreadLimit),
// The mpsMemLimitEnv is the GPU memory limit per container, e.g. 0=8192M.
// 0 represents the device ID which this container resides.
// Since MPS container can only land in one GPU, it is always device 0 relatively.
mpsMemLimitEnv: fmt.Sprintf("0=%dM", memoryLimitBytes/(1024*1024)),
}
}
return map[string]string{}
}
// SetDeviceHealth sets the health status for a GPU device or partition if MIG is enabled
func (ngm *nvidiaGPUManager) SetDeviceHealth(name string, health string, topology *pluginapi.TopologyInfo) {
ngm.devicesMutex.Lock()
defer ngm.devicesMutex.Unlock()
reg := regexp.MustCompile(nvidiaDeviceRE)
if reg.MatchString(name) {
ngm.devices[name] = pluginapi.Device{ID: name, Health: health, Topology: topology}
} else {
ngm.migDeviceManager.SetDeviceHealth(name, health, topology)
}
}
// Checks if the two nvidia paths exist. Could be used to verify if the driver
// has been installed correctly
func (ngm *nvidiaGPUManager) CheckDevicePaths() error {
if _, err := os.Stat(ngm.nvidiaCtlDevicePath); err != nil {
return err
}
if _, err := os.Stat(ngm.nvidiaUVMDevicePath); err != nil {
return err
}
return nil
}
// Discovers Nvidia GPU devices and sets up device access environment.
func (ngm *nvidiaGPUManager) Start() error {
ngm.defaultDevices = []string{ngm.nvidiaCtlDevicePath, ngm.nvidiaUVMDevicePath}
nvidiaModesetDevicePath := path.Join(ngm.devDirectory, nvidiaModesetDevice)
if _, err := os.Stat(nvidiaModesetDevicePath); err == nil {
ngm.defaultDevices = append(ngm.defaultDevices, nvidiaModesetDevicePath)
}
nvidiaUVMToolsDevicePath := path.Join(ngm.devDirectory, nvidiaUVMToolsDevice)
if _, err := os.Stat(nvidiaUVMToolsDevicePath); err == nil {
ngm.defaultDevices = append(ngm.defaultDevices, nvidiaUVMToolsDevicePath)
}
if err := ngm.discoverGPUs(); err != nil {
return err
}
if ngm.gpuConfig.GPUPartitionSize != "" {
if err := ngm.migDeviceManager.Start(ngm.gpuConfig.GPUPartitionSize); err != nil {
return fmt.Errorf("failed to start mig device manager: %v", err)
}
}
if ngm.gpuConfig.GPUSharingConfig.GPUSharingStrategy == "mps" {
if err := ngm.isMpsHealthy(); err != nil {
return fmt.Errorf("NVIDIA MPS is not running on this node: %v", err)
}
ngm.mountPaths = append(ngm.mountPaths, pluginapi.Mount{HostPath: nvidiaMpsDir, ContainerPath: nvidiaMpsDir, ReadOnly: false})
var err error
ngm.totalMemPerGPU, err = totalMemPerGPU()
if err != nil {
return fmt.Errorf("failed to query total memory available per GPU: %v", err)
}
}
return nil
}
// totalMemPerGPU returns the GPU memory available on each GPU device.
func totalMemPerGPU() (uint64, error) {
count, ret := nvml.DeviceGetCount()
if ret != nvml.SUCCESS {
return 0, fmt.Errorf("failed to enumerate devices: %v", nvml.ErrorString(ret))
}
if count <= 0 {
return 0, fmt.Errorf("no GPUs on node, count: %d", count)
}
device, ret := nvml.DeviceGetHandleByIndex(0)
if ret != nvml.SUCCESS {
return 0, fmt.Errorf("failed to query GPU with nvml: %v", nvml.ErrorString(ret))
}
memory, ret := device.GetMemoryInfo()
if ret != nvml.SUCCESS {
return 0, fmt.Errorf("failed to get GPU memory: %v", nvml.ErrorString(ret))
}
return memory.Total, nil
}
func (ngm *nvidiaGPUManager) Serve(pMountPath, kEndpoint, pluginEndpoint string) {
registerWithKubelet := false
// Check if the unix socket device-plugin/kubelet.sock is at the host path.
kubeletEndpointPath := path.Join(pMountPath, kEndpoint)
if _, err := os.Stat(kubeletEndpointPath); err == nil {
glog.Infof("registered with kubelet, will use beta API\n")
registerWithKubelet = true
} else {
glog.Infof("no kubelet.sock to register.\n")
}
// Create a watcher to watch /device-plugin directory.
watcher, _ := util.Files(pMountPath)
defer watcher.Close()
glog.Info("Starting filesystem watcher.")
for {
select {
case <-ngm.stop:
close(ngm.stop)
return
default:
{
pluginEndpointPath := path.Join(pMountPath, pluginEndpoint)
glog.Infof("starting device-plugin server at: %s\n", pluginEndpointPath)
lis, err := net.Listen("unix", pluginEndpointPath)
if err != nil {
glog.Fatalf("starting device-plugin server failed: %v", err)
}
ngm.socket = pluginEndpointPath
ngm.grpcServer = grpc.NewServer()
// Registers the supported versions of service.
pluginbeta := &pluginServiceV1Beta1{ngm: ngm}
pluginbeta.RegisterService()
var wg sync.WaitGroup
wg.Add(1)
// Starts device plugin service.
go func() {
defer wg.Done()
// Blocking call to accept incoming connections.
err := ngm.grpcServer.Serve(lis)
glog.Errorf("device-plugin server stopped serving: %v", err)
}()
if registerWithKubelet {
// Wait till the grpcServer is ready to serve services.
for len(ngm.grpcServer.GetServiceInfo()) <= 0 {
time.Sleep(1 * time.Second)
}
glog.Infoln("device-plugin server started serving")
// Registers with Kubelet.
err = RegisterWithV1Beta1Kubelet(path.Join(pMountPath, kEndpoint), pluginEndpoint, resourceName)
if err != nil {
ngm.grpcServer.Stop()
wg.Wait()
glog.Fatal(err)
}
glog.Infoln("device-plugin registered with the kubelet")
}
// This is checking if the plugin socket was deleted
// and also if there are additional GPU devices installed.
// If so, stop the grpc server and start the whole thing again.
gpuCheck := time.NewTicker(gpuCheckInterval)
pluginSocketCheck := time.NewTicker(pluginSocketCheckInterval)
defer gpuCheck.Stop()
defer pluginSocketCheck.Stop()
statusCheck:
for {
select {
// Restart the device plugin if plugin endpoint file disappears.
case <-pluginSocketCheck.C:
if _, err := os.Lstat(pluginEndpointPath); err != nil {
glog.Infof("stopping device-plugin server at: %s\n", pluginEndpointPath)
glog.Errorln(err)
ngm.grpcServer.Stop()
break statusCheck
}
// Restart the device plugin if additional GPU installers.
case <-gpuCheck.C:
if ngm.hasAdditionalGPUsInstalled() {
ngm.grpcServer.Stop()
for {
err := ngm.discoverGPUs()
if err == nil {
break statusCheck
}
}
}
// Restart the device plugin if kubelet socket gets recreated, which indicates a kubelet restart.
case event := <-watcher.Events:
if event.Name == kubeletEndpointPath && event.Op&fsnotify.Create == fsnotify.Create {
glog.Infof(" %s recreated, stopping device-plugin server", kubeletEndpointPath)
ngm.grpcServer.Stop()
break statusCheck
}
// Log for any other fs errors and log them. This will not induce a device plugin restart.
case err := <-watcher.Errors:
glog.Infof("inotify: %s", err)
}
}
wg.Wait()
}
}
}
}
func (ngm *nvidiaGPUManager) Stop() error {
glog.Infof("removing device plugin socket %s\n", ngm.socket)
if err := os.Remove(ngm.socket); err != nil && !os.IsNotExist(err) {
return err
}
ngm.stop <- true
<-ngm.stop
close(ngm.Health)
return nil
}