pkg/k8s/custom_controller.go (177 lines of code) (raw):
// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"). You may
// not use this file except in compliance with the License. A copy of the
// License is located at
//
// http://aws.amazon.com/apache2.0/
//
// or in the "license" file accompanying this file. This file is distributed
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
// express or implied. See the License for the specific language governing
// permissions and limitations under the License.
package k8s
import (
"context"
"time"
"github.com/go-logr/logr"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
)
// Converter for converting objects on provided Deltas to stripped down pod objects
type Converter interface {
// ConvertObject takes an object or delete tombstone and returns
// the modified object which will be stored in the data store
ConvertObject(originalObj interface{}) (convertedObj interface{}, err error)
// ConvertList takes an object and returns the modified list of objects which
// will be returned to the Simple Pager function to aggregate the list pagination
// response
ConvertList(originalList interface{}) (convertedList interface{}, err error)
// Resource returns the K8s resource name to list/watch
Resource() string
// ResourceType returns the k8s object to list/watch
ResourceType() runtime.Object
}
// Controller Interface implemented by PodController
type Controller interface {
// StartController starts the controller. Will block the calling routine
StartController(dataStore cache.Indexer, stopChanel chan struct{})
// GetDataStore returns the data store once it has synced with the API Server
GetDataStore() cache.Indexer
}
// CustomController is an Informer which converts Pod Objects and notifies corresponding event handlers via Channels
type CustomController struct {
// clientSet is the kubernetes client set
clientSet *kubernetes.Clientset
// pageLimit is the number of objects returned per page on a list operation
pageLimit int64
// namespace to list/watch for
namespace string
// converter is the converter implementation that converts the k8s
// object before storing in the data store
converter Converter
// resyncPeriod how often to sync using list with the API Server
resyncPeriod time.Duration
// retryOnError whether item should be retried on error. Should remain false in usual use case
retryOnError bool
// queue is the Delta FIFO queue
queue *cache.DeltaFIFO
// podEventNotificationChan channel will be notified for all pod events
eventNotificationChan chan<- GenericEvent
// log for custom controller
log logr.Logger
// controller is the K8s Controller
controller cache.Controller
// dataStore with the converted k8s object. It should not be directly accessed and used with
// the exposed APIs
dataStore cache.Indexer
}
// NewCustomController returns a new podController object
func NewCustomController(clientSet *kubernetes.Clientset, pageLimit int64, namesspace string, converter Converter, resyncPeriod time.Duration,
retryOnError bool, eventNotificationChan chan<- GenericEvent, log logr.Logger) *CustomController {
c := &CustomController{
clientSet: clientSet,
pageLimit: pageLimit,
namespace: namesspace,
converter: converter,
resyncPeriod: resyncPeriod,
retryOnError: retryOnError,
eventNotificationChan: eventNotificationChan,
log: log,
}
c.dataStore = cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{NamespaceIndexKey: NamespaceKeyIndexFunc()})
c.queue = cache.NewDeltaFIFO(cache.MetaNamespaceKeyFunc, c.dataStore)
return c
}
// StartController starts the custom controller by doing a list and watch on the specified k8s
// resource. The controller would store the converted k8s object in the provided indexer. The
// stop channel should be notified to stop the controller
func (c *CustomController) StartController(stopChanel <-chan struct{}) {
config := &cache.Config{
Queue: c.queue,
ListerWatcher: newListWatcher(c.clientSet.CoreV1().RESTClient(),
c.converter.Resource(), c.namespace, c.pageLimit, c.converter, c.log),
ObjectType: c.converter.ResourceType(),
FullResyncPeriod: c.resyncPeriod,
RetryOnError: c.retryOnError,
Process: func(obj interface{}) error {
// from oldest to newest
for _, d := range obj.(cache.Deltas) {
// Strip down the pod object and keep only the required details
convertedObj, err := c.converter.ConvertObject(d.Object)
if err != nil {
return err
}
switch d.Type {
case cache.Sync, cache.Added, cache.Updated:
c.log.V(1).Info("Received Cache event", "event type", d.Type)
if old, exists, err := c.dataStore.Get(convertedObj); err == nil && exists {
c.log.V(1).Info("Update event", "pod Ip", convertedObj.(*v1.Pod).Status.PodIP)
if err := c.dataStore.Update(convertedObj); err != nil {
return err
}
if err := c.notifyChannelOnUpdate(old, convertedObj); err != nil {
return err
}
} else if err == nil && !exists {
c.log.V(1).Info("Add/Create event", "pod Ip", convertedObj.(*v1.Pod).Status.PodIP)
if err := c.dataStore.Add(convertedObj); err != nil {
return err
}
if err := c.notifyChannelOnCreate(convertedObj); err != nil {
return err
}
} else {
return err
}
case cache.Deleted:
c.log.V(1).Info("Delete event", "pod Ip", convertedObj.(*v1.Pod).Status.PodIP)
if err := c.dataStore.Delete(convertedObj); err != nil {
return err
}
if err := c.notifyChannelOnDelete(convertedObj); err != nil {
return err
}
}
}
return nil
},
}
c.controller = cache.New(config)
// Run the controller
c.controller.Run(stopChanel)
}
// GetDataStore returns the data store when it has successfully synced with API Server
func (c *CustomController) GetDataStore() cache.Indexer {
// Custom data store, it should not be accessed directly as the cache could be out of sync
// on startup. Must be accessed from the pod controller's data store instead
// TODO: we should refactor this in the future, as this approach will make controllers to run without having pod synced.
// (It thus blocks when pod information is accessed)
for c.controller == nil || (!c.controller.HasSynced() && c.controller.LastSyncResourceVersion() == "") {
c.log.Info("waiting for controller to sync")
time.Sleep(time.Second * 5)
}
return c.dataStore
}
// newListWatcher returns a list watcher with a custom list function that converts the
// response for each page using the converter function and returns a general watcher
func newListWatcher(restClient cache.Getter, resource string, namespace string, limit int64,
converter Converter, log logr.Logger) *cache.ListWatch {
log.V(1).Info("Initializing List Watcher")
listFunc := func(options metav1.ListOptions) (runtime.Object, error) {
ctx := context.Background()
list, err := restClient.Get().
Namespace(namespace).
Resource(resource).
// This needs to be done because just setting the limit using option's
// Limit is being overridden and the response is returned without pagination.
VersionedParams(&metav1.ListOptions{
Limit: limit,
Continue: options.Continue,
}, metav1.ParameterCodec).
Do(ctx).
Get()
printList(list, "Original List", log)
if err != nil {
return list, err
}
// Strip down the the list before passing the paginated response back to
// the pager function
convertedList, err := converter.ConvertList(list)
printList(convertedList, "Converted List", log)
return convertedList.(runtime.Object), err
}
// We don't need to modify the watcher, we will strip down the k8s object in the ProcessFunc
// before storing the object in the data store.
watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
ctx := context.Background()
options.Watch = true
return restClient.Get().
Namespace(namespace).
Resource(resource).
VersionedParams(&options, metav1.ParameterCodec).
Watch(ctx)
}
return &cache.ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}
}
func printList(list interface{}, listType string, log logr.Logger) {
podList, ok := list.(*v1.PodList)
if !ok {
log.V(1).Info("Error converting in printList function")
}
ip := make([]string, 0, len(podList.Items))
for _, pod := range podList.Items {
ip = append(ip, pod.Status.PodIP)
}
log.V(1).Info("List of PodIPs", listType, ip)
}
// notifyChannelOnCreate notifies the add event on the appropriate channel
func (c *CustomController) notifyChannelOnCreate(obj interface{}) error {
c.eventNotificationChan <- GenericEvent{
EventType: CREATE,
Object: obj.(client.Object),
}
return nil
}
// notifyChannelOnCreate notifies the add event on the appropriate channel
func (c *CustomController) notifyChannelOnUpdate(oldObj, newObj interface{}) error {
c.eventNotificationChan <- GenericEvent{
EventType: UPDATE,
OldObject: oldObj.(client.Object),
Object: newObj.(client.Object),
}
return nil
}
// notifyChannelOnDelete notifies the delete event on the appropriate channel
func (c *CustomController) notifyChannelOnDelete(obj interface{}) error {
c.eventNotificationChan <- GenericEvent{
EventType: DELETE,
OldObject: obj.(client.Object),
}
return nil
}