in pkg/gpu/nvidia/manager.go [432:539]
func (ngm *nvidiaGPUManager) Serve(pMountPath, kEndpoint, pluginEndpoint string) {
registerWithKubelet := false
// Check if the unix socket device-plugin/kubelet.sock is at the host path.
kubeletEndpointPath := path.Join(pMountPath, kEndpoint)
if _, err := os.Stat(kubeletEndpointPath); err == nil {
glog.Infof("registered with kubelet, will use beta API\n")
registerWithKubelet = true
} else {
glog.Infof("no kubelet.sock to register.\n")
}
// Create a watcher to watch /device-plugin directory.
watcher, _ := util.Files(pMountPath)
defer watcher.Close()
glog.Info("Starting filesystem watcher.")
for {
select {
case <-ngm.stop:
close(ngm.stop)
return
default:
{
pluginEndpointPath := path.Join(pMountPath, pluginEndpoint)
glog.Infof("starting device-plugin server at: %s\n", pluginEndpointPath)
lis, err := net.Listen("unix", pluginEndpointPath)
if err != nil {
glog.Fatalf("starting device-plugin server failed: %v", err)
}
ngm.socket = pluginEndpointPath
ngm.grpcServer = grpc.NewServer()
// Registers the supported versions of service.
pluginbeta := &pluginServiceV1Beta1{ngm: ngm}
pluginbeta.RegisterService()
var wg sync.WaitGroup
wg.Add(1)
// Starts device plugin service.
go func() {
defer wg.Done()
// Blocking call to accept incoming connections.
err := ngm.grpcServer.Serve(lis)
glog.Errorf("device-plugin server stopped serving: %v", err)
}()
if registerWithKubelet {
// Wait till the grpcServer is ready to serve services.
for len(ngm.grpcServer.GetServiceInfo()) <= 0 {
time.Sleep(1 * time.Second)
}
glog.Infoln("device-plugin server started serving")
// Registers with Kubelet.
err = RegisterWithV1Beta1Kubelet(path.Join(pMountPath, kEndpoint), pluginEndpoint, resourceName)
if err != nil {
ngm.grpcServer.Stop()
wg.Wait()
glog.Fatal(err)
}
glog.Infoln("device-plugin registered with the kubelet")
}
// This is checking if the plugin socket was deleted
// and also if there are additional GPU devices installed.
// If so, stop the grpc server and start the whole thing again.
gpuCheck := time.NewTicker(gpuCheckInterval)
pluginSocketCheck := time.NewTicker(pluginSocketCheckInterval)
defer gpuCheck.Stop()
defer pluginSocketCheck.Stop()
statusCheck:
for {
select {
// Restart the device plugin if plugin endpoint file disappears.
case <-pluginSocketCheck.C:
if _, err := os.Lstat(pluginEndpointPath); err != nil {
glog.Infof("stopping device-plugin server at: %s\n", pluginEndpointPath)
glog.Errorln(err)
ngm.grpcServer.Stop()
break statusCheck
}
// Restart the device plugin if additional GPU installers.
case <-gpuCheck.C:
if ngm.hasAdditionalGPUsInstalled() {
ngm.grpcServer.Stop()
for {
err := ngm.discoverGPUs()
if err == nil {
break statusCheck
}
}
}
// Restart the device plugin if kubelet socket gets recreated, which indicates a kubelet restart.
case event := <-watcher.Events:
if event.Name == kubeletEndpointPath && event.Op&fsnotify.Create == fsnotify.Create {
glog.Infof(" %s recreated, stopping device-plugin server", kubeletEndpointPath)
ngm.grpcServer.Stop()
break statusCheck
}
// Log for any other fs errors and log them. This will not induce a device plugin restart.
case err := <-watcher.Errors:
glog.Infof("inotify: %s", err)
}
}
wg.Wait()
}
}
}
}