pkg/localcontroller/controller.go (166 lines of code) (raw):
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package localcontroller
import (
"context"
"fmt"
"sync"
"time"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
coreV1informers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
)
type watchedNodes struct {
nodes map[string]int
m sync.Mutex
}
type Controller struct {
kubeclientset kubernetes.Interface
nodesSynced cache.InformerSynced
workqueue workqueue.RateLimitingInterface
watchedNodes *watchedNodes
}
func NewController(
kubeclientset kubernetes.Interface,
nodesInformer cache.SharedIndexInformer) *Controller {
// Create a new controller
klog.V(1).Infof("Creating a local controller to manage node-level probers")
controller := &Controller{
kubeclientset: kubeclientset,
nodesSynced: nodesInformer.HasSynced,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "nodeworkers"),
watchedNodes: &watchedNodes{nodes: make(map[string]int)},
}
klog.V(1).Info("Setting up event handlers")
nodesInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.handleObject,
// We only care about the node restart events
// so that we need to restart the prober on the node
UpdateFunc: func(old, new interface{}) {
newNode := new.(*corev1.Node)
oldNode := old.(*corev1.Node)
if newNode.ResourceVersion == oldNode.ResourceVersion {
// Periodic resync will send update events for all known node.
// Two different versions of the same Node will always have different RVs.
klog.V(1).Infof("This is fired when the informer does the resync. No change to the resource, do nothing!")
return
}
controller.handleObject(new)
},
DeleteFunc: controller.handleObject,
})
return controller
}
// Run start the controller, setting up the event handler, as well as
// syncing informer internal caches and starting workers.
func (c *Controller) Run(workers int, stopCh <-chan struct{}) error {
defer utilruntime.HandleCrash()
defer c.workqueue.ShutDown()
if ok := cache.WaitForCacheSync(stopCh, c.nodesSynced); !ok {
return fmt.Errorf("failed to wait for caches to sync")
}
// Start n workers in parallel
for i := 0; i < workers; i++ {
go wait.Until(c.runWorker, 10*time.Second, stopCh)
}
<-stopCh
return nil
}
func (c *Controller) runWorker() {
for c.processNextQueueItem() {
}
}
// processNextQueueItem reads a single work time off the queue
// and attempt to process it
func (c *Controller) processNextQueueItem() bool {
obj, shutdown := c.workqueue.Get()
if shutdown {
return false
}
// We wrap this block in a func so we can defer c.workqueue.Done.
err := func(obj interface{}) error {
defer c.workqueue.Done(obj)
var key string
var ok bool
if key, ok = obj.(string); !ok {
// As the item in the workqueue is actually invalid, we call
// Forget here else we'd go into a loop of attempting to
// process a work item that is invalid.
c.workqueue.Forget(obj)
utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
return nil
}
// Run the syncHandler, passing it the namespace/name string of the
// Foo resource to be synced.
if err := c.syncHandler(key); err != nil {
// Put the item back on the workqueue to handle any transient errors.
c.workqueue.AddRateLimited(key)
return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
}
// Finally, if no error occurs we Forget this item so it does not
// get queued again until another change happens.
c.workqueue.Forget(obj)
c.watchedNodes.registerNode(key)
klog.V(1).Infof("Successfully processed the change to the node '%s'", key)
return nil
}(obj)
if err != nil {
utilruntime.HandleError(err)
return true
}
return true
}
// Here it includes the actual business logic to
// start the node prober when a new node is added or
// shutdown the prober when a node is deleted
func (c *Controller) syncHandler(key string) error {
klog.V(1).Infof("Now processing the node <%s>\n", key)
return nil
}
// handleObject runs filtering on the events and
// it only inserts the events we need to care in the workqueue
func (c *Controller) handleObject(obj interface{}) {
klog.V(1).Infof("Detected Node changes!")
var object metav1.Object
var ok bool
if object, ok = obj.(metav1.Object); !ok {
deleted, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
utilruntime.HandleError(fmt.Errorf("error decoding object, invalid type"))
return
}
object, ok = deleted.Obj.(metav1.Object)
if !ok {
utilruntime.HandleError(fmt.Errorf("error decoding delete object, invalid type"))
return
}
}
c.enqueue(object)
}
func (c *Controller) enqueue(obj interface{}) {
var key string
var err error
// Convert the object into a key (in this case
// we are doing it in the format of 'namespace/name')
if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
utilruntime.HandleError(err)
return
}
// Should the logic be moved to handleObject?
if _, ok := c.watchedNodes.nodes[key]; ok {
klog.V(1).Infof("Node <%s> is already being watched, skip enqueue the workqueue\n", key)
return
}
c.workqueue.Add(key)
klog.V(1).Infof("Added the Node <%s> to workqueue\n", key)
}
func (wd *watchedNodes) registerNode(node string) error {
wd.m.Lock()
defer wd.m.Unlock()
if _, ok := wd.nodes[node]; ok {
return (fmt.Errorf("Node <%s> is already registered", node))
}
wd.nodes[node] = 0
return nil
}
func (wd *watchedNodes) popNode(node string) error {
wd.m.Lock()
defer wd.m.Unlock()
if _, ok := wd.nodes[node]; ok {
delete(wd.nodes, node)
return nil
}
return (fmt.Errorf("Node <%s> not found", node))
}
func StartController(ctx context.Context, kubeclientset *kubernetes.Clientset) {
stopCh := make(chan struct{})
defer close(stopCh)
nodeInformer := coreV1informers.NewNodeInformer(kubeclientset, 0, cache.Indexers{})
controller := NewController(kubeclientset, nodeInformer)
go nodeInformer.Run(stopCh)
if err := controller.Run(1, stopCh); err != nil {
klog.Fatalf("Error running controller: %s", err.Error())
return
}
select {
case <-ctx.Done():
return
}
}