pkg/k8s/custom_source.go (108 lines of code) (raw):
package k8s
import (
"context"
"fmt"
"sync"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/runtime/inject"
"sigs.k8s.io/controller-runtime/pkg/source"
)
const (
// defaultBufferSize is the default number of event notifications that can be buffered.
defaultBufferSize = 1024
)
var _ source.Source = &NotificationChannel{}
// EventType identifies type of PodEvent (CREATE/UPDATE/DELETE)
type EventType int
const (
// CREATE event for given object
CREATE EventType = 1
// DELETE event for given object
DELETE EventType = 2
// UPDATE event for given object
UPDATE EventType = 3
)
// GenericEvent is a wrapper for Create/Update/Delete Events
type GenericEvent struct {
EventType
// Object is the object from the incoming request
Object client.Object
// OldObject is the existing object. Only populated for DELETE and UPDATE requests.
OldObject client.Object
}
// NotificationChannel monitors channels of type Create/Update/Delete
type NotificationChannel struct {
// once ensures the event distribution goroutine will be performed only once
once sync.Once
// stop is to end ongoing goroutine, and close the Create channel
stop <-chan struct{}
Source <-chan GenericEvent
// dest is the destination channels of the Pod event handlers
dest []chan GenericEvent
// destBufferSize is the specified buffer size of dest channels.
// Default to 1024 if not specified.
destBufferSize int
// destLock is to ensure the destination channels are safely added/removed
destLock sync.Mutex
}
var _ inject.Stoppable = &NotificationChannel{}
// InjectStopChannel is internal should be called only by the Controller.
// It is used to inject the stop channel initialized by the ControllerManager.
func (cs *NotificationChannel) InjectStopChannel(stop <-chan struct{}) error {
if cs.stop == nil {
cs.stop = stop
}
return nil
}
func (cs *NotificationChannel) String() string {
return fmt.Sprintf("channel source: %p", cs)
}
// Start implements Source and should only be called by the Controller.
func (cs *NotificationChannel) Start(
ctx context.Context,
handler handler.EventHandler,
queue workqueue.RateLimitingInterface,
prct ...predicate.Predicate) error {
// Source should have been specified by the user.
if cs.Source == nil {
return fmt.Errorf("must specify CustomChannle.Source")
}
// stop should have been injected before Start was called
if cs.stop == nil {
return fmt.Errorf("must call InjectStop on Channel before calling Start")
}
// use default value if DestBufferSize not specified
if cs.destBufferSize == 0 {
cs.destBufferSize = defaultBufferSize
}
cs.once.Do(func() {
// Distribute GenericEvents to all EventHandler / Queue pairs Watching this source
go cs.syncLoop()
})
dst := make(chan GenericEvent, cs.destBufferSize)
go func() {
for evt := range dst {
switch evt.EventType {
case CREATE:
handler.Create(event.CreateEvent{Object: evt.Object}, queue)
case DELETE:
handler.Delete(event.DeleteEvent{Object: evt.OldObject}, queue)
case UPDATE:
handler.Update(event.UpdateEvent{ObjectOld: evt.OldObject, ObjectNew: evt.Object}, queue)
default:
_ = fmt.Errorf("Invalid Type %T", evt.EventType)
}
}
}()
cs.destLock.Lock()
defer cs.destLock.Unlock()
cs.dest = append(cs.dest, dst)
return nil
}
func (cs *NotificationChannel) doStop() {
cs.destLock.Lock()
defer cs.destLock.Unlock()
for _, dst := range cs.dest {
close(dst)
}
}
// distribute reads the Source channel and add events to its
// internal destination buffer
func (cs *NotificationChannel) distribute(evt GenericEvent) {
cs.destLock.Lock()
defer cs.destLock.Unlock()
for _, dst := range cs.dest {
// We cannot make it under goroutine here, or we'll meet the
// race condition of writing message to closed channels.
// To avoid blocking, the dest channels are expected to be of
// proper buffer size. If we still see it blocked, then
// the controller is thought to be in an abnormal state.
dst <- evt
}
}
// syncLoop keeps running and it monitors the stop and Source channel
// If there is an event on Source channel it dispatches it to internal destination buffer
func (cs *NotificationChannel) syncLoop() {
for {
select {
case <-cs.stop:
cs.doStop()
return
case evt := <-cs.Source:
cs.distribute(evt)
}
}
}