func()

in pkg/gpu/nvidia/manager.go [432:539]


func (ngm *nvidiaGPUManager) Serve(pMountPath, kEndpoint, pluginEndpoint string) {
	registerWithKubelet := false
	// Check if the unix socket device-plugin/kubelet.sock is at the host path.
	kubeletEndpointPath := path.Join(pMountPath, kEndpoint)
	if _, err := os.Stat(kubeletEndpointPath); err == nil {
		glog.Infof("registered with kubelet, will use beta API\n")
		registerWithKubelet = true
	} else {
		glog.Infof("no kubelet.sock to register.\n")
	}

	// Create a watcher to watch /device-plugin directory.
	watcher, _ := util.Files(pMountPath)
	defer watcher.Close()
	glog.Info("Starting filesystem watcher.")

	for {
		select {
		case <-ngm.stop:
			close(ngm.stop)
			return
		default:
			{
				pluginEndpointPath := path.Join(pMountPath, pluginEndpoint)
				glog.Infof("starting device-plugin server at: %s\n", pluginEndpointPath)
				lis, err := net.Listen("unix", pluginEndpointPath)
				if err != nil {
					glog.Fatalf("starting device-plugin server failed: %v", err)
				}
				ngm.socket = pluginEndpointPath
				ngm.grpcServer = grpc.NewServer()

				// Registers the supported versions of service.
				pluginbeta := &pluginServiceV1Beta1{ngm: ngm}
				pluginbeta.RegisterService()

				var wg sync.WaitGroup
				wg.Add(1)
				// Starts device plugin service.
				go func() {
					defer wg.Done()
					// Blocking call to accept incoming connections.
					err := ngm.grpcServer.Serve(lis)
					glog.Errorf("device-plugin server stopped serving: %v", err)
				}()

				if registerWithKubelet {
					// Wait till the grpcServer is ready to serve services.
					for len(ngm.grpcServer.GetServiceInfo()) <= 0 {
						time.Sleep(1 * time.Second)
					}
					glog.Infoln("device-plugin server started serving")
					// Registers with Kubelet.
					err = RegisterWithV1Beta1Kubelet(path.Join(pMountPath, kEndpoint), pluginEndpoint, resourceName)
					if err != nil {
						ngm.grpcServer.Stop()
						wg.Wait()
						glog.Fatal(err)
					}
					glog.Infoln("device-plugin registered with the kubelet")
				}

				// This is checking if the plugin socket was deleted
				// and also if there are additional GPU devices installed.
				// If so, stop the grpc server and start the whole thing again.
				gpuCheck := time.NewTicker(gpuCheckInterval)
				pluginSocketCheck := time.NewTicker(pluginSocketCheckInterval)
				defer gpuCheck.Stop()
				defer pluginSocketCheck.Stop()
			statusCheck:
				for {
					select {
					// Restart the device plugin if plugin endpoint file disappears.
					case <-pluginSocketCheck.C:
						if _, err := os.Lstat(pluginEndpointPath); err != nil {
							glog.Infof("stopping device-plugin server at: %s\n", pluginEndpointPath)
							glog.Errorln(err)
							ngm.grpcServer.Stop()
							break statusCheck
						}
					// Restart the device plugin if additional GPU installers.
					case <-gpuCheck.C:
						if ngm.hasAdditionalGPUsInstalled() {
							ngm.grpcServer.Stop()
							for {
								err := ngm.discoverGPUs()
								if err == nil {
									break statusCheck
								}
							}
						}
					// Restart the device plugin if kubelet socket gets recreated, which indicates a kubelet restart.
					case event := <-watcher.Events:
						if event.Name == kubeletEndpointPath && event.Op&fsnotify.Create == fsnotify.Create {
							glog.Infof(" %s recreated, stopping device-plugin server", kubeletEndpointPath)
							ngm.grpcServer.Stop()
							break statusCheck
						}
					// Log for any other fs errors and log them. This will not induce a device plugin restart.
					case err := <-watcher.Errors:
						glog.Infof("inotify: %s", err)
					}
				}
				wg.Wait()
			}
		}
	}
}