pkg/k8s/informer.go (57 lines of code) (raw):
package k8s
import (
"context"
"sync"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
)
type PodInformer struct {
K8sClient kubernetes.Interface
NodeName string
mu sync.Mutex
informerFactory informers.SharedInformerFactory
informer cache.SharedIndexInformer
eventHandlerCount int
}
func NewPodInformer(k8sClient kubernetes.Interface, nodeName string) *PodInformer {
return &PodInformer{
K8sClient: k8sClient,
NodeName: nodeName,
eventHandlerCount: 0,
}
}
// Add adds a handler to the informer. The handler will be called when a pod is added, updated, or deleted.
// Lazily creates and starts the informer if it does not already exist.
func (p *PodInformer) Add(ctx context.Context, handler cache.ResourceEventHandler) (cache.ResourceEventHandlerRegistration, error) {
p.mu.Lock()
defer p.mu.Unlock()
if p.informerFactory == nil {
tweakOptions := informers.WithTweakListOptions(func(lo *metav1.ListOptions) {
lo.FieldSelector = "spec.nodeName=" + p.NodeName
})
p.informerFactory = informers.NewSharedInformerFactoryWithOptions(p.K8sClient, time.Minute, tweakOptions)
p.informer = p.informerFactory.Core().V1().Pods().Informer()
p.informerFactory.Start(ctx.Done())
p.informerFactory.WaitForCacheSync(ctx.Done())
}
ret, err := p.informer.AddEventHandler(handler)
if err == nil {
p.eventHandlerCount++
}
return ret, err
}
// Remove removes a handler from the informer.
// Lazily shuts down the informer if there are no more handlers.
func (p *PodInformer) Remove(reg cache.ResourceEventHandlerRegistration) error {
p.mu.Lock()
defer p.mu.Unlock()
err := p.informer.RemoveEventHandler(reg)
if err == nil {
p.eventHandlerCount--
if p.eventHandlerCount == 0 {
p.informerFactory.Shutdown()
p.informerFactory = nil
p.informer = nil
}
}
return err
}