cns/deviceplugin/pluginmanager.go (94 lines of code) (raw):
package deviceplugin
import (
"context"
"sync"
"time"
"github.com/Azure/azure-container-networking/crd/multitenancy/api/v1alpha1"
"github.com/pkg/errors"
"go.uber.org/zap"
"k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
)
const (
defaultDevicePluginDirectory = "/var/lib/kubelet/device-plugins"
defaultDeviceCheckInterval = 5 * time.Second
)
type pluginManagerOptions struct {
devicePluginDirectory string
kubeletSocket string
deviceCheckInterval time.Duration
}
type pluginManagerOption func(*pluginManagerOptions)
func PluginManagerSocketPrefix(prefix string) func(*pluginManagerOptions) {
return func(opts *pluginManagerOptions) {
opts.devicePluginDirectory = prefix
}
}
func PluginManagerKubeletSocket(socket string) func(*pluginManagerOptions) {
return func(opts *pluginManagerOptions) {
opts.kubeletSocket = socket
}
}
func PluginDeviceCheckInterval(i time.Duration) func(*pluginManagerOptions) {
return func(opts *pluginManagerOptions) {
opts.deviceCheckInterval = i
}
}
// PluginManager runs device plugins for vnet nics and ib nics
type PluginManager struct {
Logger *zap.Logger
plugins []*Plugin
socketWatcher *SocketWatcher
options pluginManagerOptions
mu sync.Mutex
}
func NewPluginManager(l *zap.Logger, opts ...pluginManagerOption) *PluginManager {
logger := l.With(zap.String("component", "devicePlugin"))
socketWatcher := NewSocketWatcher(logger)
options := pluginManagerOptions{
devicePluginDirectory: defaultDevicePluginDirectory,
kubeletSocket: v1beta1.KubeletSocket,
deviceCheckInterval: defaultDeviceCheckInterval,
}
for _, o := range opts {
o(&options)
}
return &PluginManager{
Logger: logger,
socketWatcher: socketWatcher,
options: options,
}
}
func (pm *PluginManager) AddPlugin(deviceType v1alpha1.DeviceType, deviceCount int) *PluginManager {
pm.mu.Lock()
defer pm.mu.Unlock()
p := NewPlugin(pm.Logger, string(deviceType), pm.socketWatcher,
pm.options.devicePluginDirectory, deviceCount, deviceType, pm.options.kubeletSocket, pm.options.deviceCheckInterval)
pm.plugins = append(pm.plugins, p)
return pm
}
// Run runs the plugin manager until the context is cancelled or error encountered
func (pm *PluginManager) Run(ctx context.Context) error {
// clean up any leftover state from previous failed plugins
// this can happen if the process crashes before it is able to clean up after itself
for _, plugin := range pm.plugins {
if err := plugin.CleanOldState(); err != nil {
return errors.Wrap(err, "error cleaning state from previous plugin process")
}
}
var wg sync.WaitGroup
for _, plugin := range pm.plugins {
wg.Add(1) //nolint:gomnd // in favor of readability
go func(p *Plugin) {
defer wg.Done()
p.Run(ctx)
}(plugin)
}
wg.Wait()
return nil
}
func (pm *PluginManager) TrackDevices(deviceType v1alpha1.DeviceType, count int) {
pm.mu.Lock()
defer pm.mu.Unlock()
for _, plugin := range pm.plugins {
if plugin.deviceType == deviceType {
plugin.UpdateDeviceCount(count)
break
}
}
}