func()

in plugins/internal/plugin/device_plugin.go [290:336]


func (p *DevicePlugin) ListAndWatch(request *pluginapi.Empty, stream pluginapi.DevicePlugin_ListAndWatchServer) error {

	// Force a device list refresh to ensure we have an initial list for the Kubelet
	p.logger.Info("ListAndWatch streaming RPC started, refreshing the device list")
	p.watcher.ForceRefresh()

	// Continue sending updates as our device list changes or until shutdown is requested
	for {
		select {

		case <-p.stopListWatch:
			p.logger.Info("Shutdown requested, stopping ListAndWatch streaming RPC")
			return nil

		case <-stream.Context().Done():
			p.logger.Info("Kubelet disconnect detected, stopping ListAndWatch streaming RPC")
			return nil

		case devices := <-p.watcher.Updates:
			p.logger.Infow("Received new device list", "devices", devices)

			// Store the device list
			p.devicesMutex.Lock()
			p.currentDevices = devices
			p.devicesMutex.Unlock()

			// Convert the device discovery devices to Kubernetes device plugin API devices
			kubeletDevices := []*pluginapi.Device{}
			for _, device := range devices {

				// Advertise each device multiple times, as per our multitenancy setting
				for i := uint32(0); i < p.config.Multitenancy; i += 1 {
					kubeletDevices = append(kubeletDevices, &pluginapi.Device{
						ID:     fmt.Sprintf("%s\\%d", device.ID, i),
						Health: pluginapi.Healthy,
					})
				}
			}

			// Send the device list to the Kubelet
			p.logger.Infow("Sending device list to Kubelet", "devices", kubeletDevices)
			stream.Send(&pluginapi.ListAndWatchResponse{
				Devices: kubeletDevices,
			})
		}
	}
}