kubernetes/eventhandler.go (144 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package kubernetes
import (
"reflect"
"sync"
)
// ResourceEventHandler can handle notifications for events that happen to a
// resource. The events are informational only, so you can't return an
// error.
// - OnAdd is called when an object is added.
// - OnUpdate is called when an object is modified. Note that oldObj is the
// last known state of the object-- it is possible that several changes
// were combined together, so you can't use this to see every single
// change. OnUpdate is also called when a re-list happens, and it will
// get called even if nothing changed. This is useful for periodically
// evaluating or syncing something.
// - OnDelete will get the final state of the item if it is known, otherwise
// it will get an object of type DeletedFinalStateUnknown. This can
// happen if the watch is closed and misses the delete event and we don't
// notice the deletion until the subsequent re-list.
//
// idea: allow the On* methods to return an error so that the RateLimited WorkQueue
// idea: can requeue the failed event processing.
type ResourceEventHandler interface {
OnAdd(obj interface{})
OnUpdate(obj interface{})
OnDelete(obj interface{})
}
// ResourceEventHandlerFuncs is an adaptor to let you easily specify as many or
// as few of the notification functions as you want while still implementing
// ResourceEventHandler.
type ResourceEventHandlerFuncs struct {
AddFunc func(obj interface{})
UpdateFunc func(obj interface{})
DeleteFunc func(obj interface{})
}
// OnAdd calls AddFunc if it's not nil.
func (r ResourceEventHandlerFuncs) OnAdd(obj interface{}) {
if r.AddFunc != nil {
r.AddFunc(obj)
}
}
// OnUpdate calls UpdateFunc if it's not nil.
func (r ResourceEventHandlerFuncs) OnUpdate(obj interface{}) {
if r.UpdateFunc != nil {
r.UpdateFunc(obj)
}
}
// OnDelete calls DeleteFunc if it's not nil.
func (r ResourceEventHandlerFuncs) OnDelete(obj interface{}) {
if r.DeleteFunc != nil {
r.DeleteFunc(obj)
}
}
// NoOpEventHandlerFuncs ensures that watcher reconciliation can happen even without the required funcs
type NoOpEventHandlerFuncs struct {
}
// OnAdd does a no-op on an add event
func (n NoOpEventHandlerFuncs) OnAdd(obj interface{}) {
}
// OnUpdate does a no-op on an update event
func (n NoOpEventHandlerFuncs) OnUpdate(obj interface{}) {
}
// OnDelete does a no-op on a delete event
func (n NoOpEventHandlerFuncs) OnDelete(obj interface{}) {
}
// FilteringResourceEventHandler applies the provided filter to all events coming
// in, ensuring the appropriate nested handler method is invoked. An object
// that starts passing the filter after an update is considered an add, and an
// object that stops passing the filter after an update is considered a delete.
type FilteringResourceEventHandler struct {
FilterFunc func(obj interface{}) bool
Handler ResourceEventHandler
}
// OnAdd calls the nested handler only if the filter succeeds
func (r FilteringResourceEventHandler) OnAdd(obj interface{}) {
if !r.FilterFunc(obj) {
return
}
r.Handler.OnAdd(obj)
}
// OnUpdate ensures the proper handler is called depending on whether the filter matches
func (r FilteringResourceEventHandler) OnUpdate(obj interface{}) {
if !r.FilterFunc(obj) {
return
}
r.Handler.OnUpdate(obj)
}
// OnDelete calls the nested handler only if the filter succeeds
func (r FilteringResourceEventHandler) OnDelete(obj interface{}) {
if !r.FilterFunc(obj) {
return
}
r.Handler.OnDelete(obj)
}
// podUpdaterHandlerFunc is a function that handles pod updater notifications.
type podUpdaterHandlerFunc func(interface{})
// podUpdaterStore is the interface that an object needs to implement to be
// used as a pod updater store.
type podUpdaterStore interface {
List() []interface{}
}
// namespacePodUpdater notifies updates on pods when their namespaces are updated.
type namespacePodUpdater struct {
handler podUpdaterHandlerFunc
store podUpdaterStore
namespaceWatcher Watcher
locker sync.Locker
}
// NewNamespacePodUpdater creates a namespacePodUpdater
func NewNamespacePodUpdater(handler podUpdaterHandlerFunc, store podUpdaterStore, namespaceWatcher Watcher, locker sync.Locker) *namespacePodUpdater {
return &namespacePodUpdater{
handler: handler,
store: store,
namespaceWatcher: namespaceWatcher,
locker: locker,
}
}
// OnUpdate handles update events on namespaces.
func (n *namespacePodUpdater) OnUpdate(obj interface{}) {
ns, ok := obj.(*Namespace)
if !ok {
return
}
// n.store.List() returns a snapshot at this point. If a delete is received
// from the main watcher, this loop may generate an update event after the
// delete is processed, leaving configurations that would never be deleted.
// Also this loop can miss updates, what could leave outdated configurations.
// Avoid these issues by locking the processing of events from the main watcher.
if n.locker != nil {
n.locker.Lock()
defer n.locker.Unlock()
}
cachedObject := n.namespaceWatcher.CachedObject()
cachedNamespace, ok := cachedObject.(*Namespace)
if ok && ns.Name == cachedNamespace.Name {
labelscheck := reflect.DeepEqual(ns.ObjectMeta.Labels, cachedNamespace.ObjectMeta.Labels)
annotationscheck := reflect.DeepEqual(ns.ObjectMeta.Annotations, cachedNamespace.ObjectMeta.Annotations)
// Only if there is a difference in Metadata labels or annotations proceed to Pod update
if !labelscheck || !annotationscheck {
for _, pod := range n.store.List() {
pod, ok := pod.(*Pod)
if ok && pod.Namespace == ns.Name {
n.handler(pod)
}
}
}
}
}
// OnAdd handles add events on namespaces. Nothing to do, if pods are added to this
// namespace they will generate their own add events.
func (*namespacePodUpdater) OnAdd(interface{}) {}
// OnDelete handles delete events on namespaces. Nothing to do, if pods are deleted from this
// namespace they will generate their own delete events.
func (*namespacePodUpdater) OnDelete(interface{}) {}
// nodePodUpdater notifies updates on pods when their nodes are updated.
type nodePodUpdater struct {
handler podUpdaterHandlerFunc
store podUpdaterStore
nodeWatcher Watcher
locker sync.Locker
}
// NewNodePodUpdater creates a nodePodUpdater
func NewNodePodUpdater(handler podUpdaterHandlerFunc, store podUpdaterStore, nodeWatcher Watcher, locker sync.Locker) *nodePodUpdater {
return &nodePodUpdater{
handler: handler,
store: store,
nodeWatcher: nodeWatcher,
locker: locker,
}
}
// OnUpdate handles update events on nodes.
func (n *nodePodUpdater) OnUpdate(obj interface{}) {
node, ok := obj.(*Node)
if !ok {
return
}
// n.store.List() returns a snapshot at this point. If a delete is received
// from the main watcher, this loop may generate an update event after the
// delete is processed, leaving configurations that would never be deleted.
// Also this loop can miss updates, what could leave outdated configurations.
// Avoid these issues by locking the processing of events from the main watcher.
if n.locker != nil {
n.locker.Lock()
defer n.locker.Unlock()
}
cachedObject := n.nodeWatcher.CachedObject()
cachedNode, ok := cachedObject.(*Node)
if ok && node.Name == cachedNode.Name {
labelscheck := reflect.DeepEqual(node.ObjectMeta.Labels, cachedNode.ObjectMeta.Labels)
annotationscheck := reflect.DeepEqual(node.ObjectMeta.Annotations, cachedNode.ObjectMeta.Annotations)
// Only if there is a difference in Metadata labels or annotations proceed to Pod update
if !labelscheck || !annotationscheck {
for _, pod := range n.store.List() {
pod, ok := pod.(*Pod)
if ok && pod.Spec.NodeName == node.Name {
n.handler(pod)
}
}
}
}
}
// OnAdd handles add events on namespaces. Nothing to do, if pods are added to this
// namespace they will generate their own add events.
func (*nodePodUpdater) OnAdd(interface{}) {}
// OnDelete handles delete events on namespaces. Nothing to do, if pods are deleted from this
// namespace they will generate their own delete events.
func (*nodePodUpdater) OnDelete(interface{}) {}