cns/deviceplugin/socketwatcher.go (63 lines of code) (raw):
package deviceplugin
import (
"context"
"os"
"sync"
"time"
"go.uber.org/zap"
)
const defaultStatInterval time.Duration = 5 * time.Second
type SocketWatcherOption func(*socketWatcherOptions)
type socketWatcherOptions struct {
statInterval time.Duration
}
func SocketWatcherStatInterval(d time.Duration) SocketWatcherOption {
return func(o *socketWatcherOptions) {
o.statInterval = d
}
}
type SocketWatcher struct {
socketChans map[string]<-chan struct{}
mutex sync.Mutex
logger *zap.Logger
options socketWatcherOptions
}
func NewSocketWatcher(logger *zap.Logger, opts ...SocketWatcherOption) *SocketWatcher {
defaultOptions := socketWatcherOptions{
statInterval: defaultStatInterval,
}
for _, o := range opts {
o(&defaultOptions)
}
return &SocketWatcher{
socketChans: make(map[string]<-chan struct{}),
logger: logger,
options: defaultOptions,
}
}
// watchSocket returns a channel that will be closed when the socket is removed or the context is cancelled
func (s *SocketWatcher) WatchSocket(ctx context.Context, socket string) <-chan struct{} {
s.mutex.Lock()
defer s.mutex.Unlock()
// if a socket is already being watched, return its channel
if ch, ok := s.socketChans[socket]; ok {
return ch
}
// otherwise, start watching it and return a new channel
socketChan := make(chan struct{})
s.socketChans[socket] = socketChan
go func() {
defer close(socketChan)
ticker := time.NewTicker(s.options.statInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if _, err := os.Lstat(socket); err != nil {
s.logger.Info("failed to stat socket", zap.Error(err))
return
}
}
}
}()
return socketChan
}