in plugins/internal/plugin/device_plugin.go [290:336]
func (p *DevicePlugin) ListAndWatch(request *pluginapi.Empty, stream pluginapi.DevicePlugin_ListAndWatchServer) error {
// Force a device list refresh to ensure we have an initial list for the Kubelet
p.logger.Info("ListAndWatch streaming RPC started, refreshing the device list")
p.watcher.ForceRefresh()
// Continue sending updates as our device list changes or until shutdown is requested
for {
select {
case <-p.stopListWatch:
p.logger.Info("Shutdown requested, stopping ListAndWatch streaming RPC")
return nil
case <-stream.Context().Done():
p.logger.Info("Kubelet disconnect detected, stopping ListAndWatch streaming RPC")
return nil
case devices := <-p.watcher.Updates:
p.logger.Infow("Received new device list", "devices", devices)
// Store the device list
p.devicesMutex.Lock()
p.currentDevices = devices
p.devicesMutex.Unlock()
// Convert the device discovery devices to Kubernetes device plugin API devices
kubeletDevices := []*pluginapi.Device{}
for _, device := range devices {
// Advertise each device multiple times, as per our multitenancy setting
for i := uint32(0); i < p.config.Multitenancy; i += 1 {
kubeletDevices = append(kubeletDevices, &pluginapi.Device{
ID: fmt.Sprintf("%s\\%d", device.ID, i),
Health: pluginapi.Healthy,
})
}
}
// Send the device list to the Kubelet
p.logger.Infow("Sending device list to Kubelet", "devices", kubeletDevices)
stream.Send(&pluginapi.ListAndWatchResponse{
Devices: kubeletDevices,
})
}
}
}