in plugins/internal/plugin/device_plugin.go [64:131]
func NewDevicePlugin(pluginName string, pluginVersion string, resourceName string, filter discovery.DeviceFilter, config *PluginConfig, logger *zap.SugaredLogger) (*DevicePlugin, error) {
// Attempt to create a new DeviceWatcher
watcher, err := NewDeviceWatcher(
pluginVersion,
filter,
config.IncludeIntegrated,
config.IncludeDetachable,
config.AdditionalMounts,
config.AdditionalMountsWow64,
logger,
)
if err != nil {
return nil, err
}
// Verify that device watcher can successfully list devices
select {
case <-watcher.Updates:
logger.Info("Initial device list retrieved successfully")
case <-watcher.Errors:
watcher.Destroy()
return nil, fmt.Errorf("failed to perform device discovery: %v", err)
}
// Create a new device plugin instance with the device watcher
plugin := &DevicePlugin{
name: pluginName,
config: config,
endpoint: "",
endpointDeleted: nil,
resourceName: resourceName,
watcher: watcher,
currentDevices: []*discovery.Device{},
devicesMutex: sync.Mutex{},
logger: logger,
server: nil,
restart: make(chan struct{}, 1),
stopListWatch: nil,
Errors: make(chan error, 1),
}
// Forward any device watcher errors to the plugin's error channel
go func() {
for err := range plugin.watcher.Errors {
plugin.Errors <- err
}
}()
// Restart the plugin's gRPC server and perform plugin registration again in the event of a Kubelet restart
go func() {
for range plugin.restart {
// Restart the gRPC server with a new Unix socket filename since the Kubelet will delete the old one
if err := plugin.RestartServer(); err != nil {
plugin.Errors <- err
}
// Register the device plugin with the new Kubelet instance
if err := plugin.RegisterWithKubelet(); err != nil {
plugin.Errors <- err
}
}
}()
return plugin, nil
}