controllers/custom/custom_controller.go (187 lines of code) (raw):
// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"). You may
// not use this file except in compliance with the License. A copy of the
// License is located at
//
// http://aws.amazon.com/apache2.0/
//
// or in the "license" file accompanying this file. This file is distributed
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
// express or implied. See the License for the specific language governing
// permissions and limitations under the License.
package custom
import (
"context"
"fmt"
"net/http"
"time"
"github.com/aws/amazon-vpc-resource-controller-k8s/pkg/condition"
"github.com/go-logr/logr"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/healthz"
rcHealthz "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/healthz"
)
// Converter for converting k8s object and object list used in watches and list operation
// to the desired format.
type Converter interface {
// ConvertObject takes an object and returns the modified object which will be
// stored in the data store
ConvertObject(originalObj interface{}) (convertedObj interface{}, err error)
// ConvertList takes an object and returns the modified list of objects which
// will be returned to the Simple Pager function to aggregate the list pagination
// response
ConvertList(originalList interface{}) (convertedList interface{}, err error)
// Resource returns the K8s resource name to list/watch
Resource() string
// ResourceType returns the k8s object to list/watch
ResourceType() runtime.Object
// Indexer returns the key for indexing custom converted object
Indexer(obj interface{}) (string, error)
}
type Reconciler interface {
Reconcile(request Request) (ctrl.Result, error)
}
// Options contains the configurable parameters of the Custom Controller
type Options struct {
// Name of the controller used for creating named work queues
Name string
// PageLimit is the number of objects returned per page on a list operation
PageLimit int
// Namespace to list and watch for
Namespace string
// ResyncPeriod how often to sync using list with the API Server
ResyncPeriod time.Duration
// MaxConcurrentReconciles to parallelize processing of worker queue
MaxConcurrentReconciles int
}
// This Controller can be used for any type of K8s object, but is used for Pod Objects
// in this repository. There are two reasons why we are using a wrapper over the low level
// controllers instead of using controllers from controller-runtime.
// 1. We don't want to cache the entire Pod Object because of Memory constraints.
// We need specific details from metadata and Pod Spec. To do this we intercept
// the request at List; and watch, optimize it before it's stored in cache.
// Long term plan is to use MetaData only cache or disable Pod caching altogether
// 2. We want the Deleted Object when Pod is Terminating. Pod Networking should only be deleted
// once the Pod has deleted or all containers have exited.
// Long term plan is to consider migrating to using finalizers and delete only when
// all containers have exited.
//
// In future, we may be able to switch to upstream controller for reconciling Pods if the
// long term solutions are in place
type CustomController struct {
// workQueue to store create/update/delete events
workQueue workqueue.RateLimitingInterface
// log for custom controller
log logr.Logger
// Reconciler will be called on all the K8s object events
Do Reconciler
// config to create a new client-go controller
config *cache.Config
// options is the configurable parameters for creating
// the controller
options Options
conditions condition.Conditions
checker healthz.Checker
}
// Request for Add/Update only contains the Namespace/Name
// Request for Delete contains the Pod Object as by the time
// Delete Request is reconciled the cache will not have it
type Request struct {
// Add/Update Request will contain the Namespaced Name only. The
// item can be retrieved from the indexer for add/update events
NamespacedName types.NamespacedName
// Delete Event will contain the DeletedObject only.
DeletedObject interface{}
}
func NewCustomController(
log logr.Logger,
options Options,
config *cache.Config,
reconciler Reconciler,
workQueue workqueue.RateLimitingInterface,
conditions condition.Conditions) *CustomController {
cc := &CustomController{
log: log,
options: options,
config: config,
Do: reconciler,
workQueue: workQueue,
conditions: conditions,
}
cc.checker = cc.CustomCheck()
return cc
}
// Starts the low level controller
func (c *CustomController) Start(ctx context.Context) error {
// This is important to allow the data store to be synced
// Before the other controller starts
// Shut down the queue so the worker can stop
defer c.workQueue.ShutDown()
err := func() error {
coreController := cache.New(c.config)
c.log.Info("starting custom controller")
go coreController.Run(ctx.Done())
// Wait till cache sync
c.WaitForCacheSync(coreController)
c.log.Info("Starting Workers", "worker count",
c.options.MaxConcurrentReconciles)
for i := 0; i < c.options.MaxConcurrentReconciles; i++ {
go wait.Until(c.worker, time.Second, ctx.Done())
}
return nil
}()
if err != nil {
return err
}
<-ctx.Done()
c.log.Info("stopping workers")
return nil
}
// WaitForCacheSync tills the cache has synced, this must be done under
// mutex lock to prevent other controllers from starting at same time
func (c *CustomController) WaitForCacheSync(controller cache.Controller) {
for !controller.HasSynced() && controller.LastSyncResourceVersion() == "" {
c.log.Info("waiting for controller to sync")
time.Sleep(time.Second * 5)
}
c.conditions.SetPodDataStoreSyncStatus(true)
c.log.Info("cache has synced successfully")
}
// newOptimizedListWatcher returns a list watcher with a custom list function that converts the
// response for each page using the converter function and returns a general watcher
func newOptimizedListWatcher(ctx context.Context, restClient cache.Getter, resource string, namespace string,
converter Converter, log logr.Logger) *cache.ListWatch {
listFunc := func(options metav1.ListOptions) (runtime.Object, error) {
list, err := restClient.Get().
Namespace(namespace).
Resource(resource).
VersionedParams(&metav1.ListOptions{
Limit: options.Limit,
Continue: options.Continue,
}, metav1.ParameterCodec).
Do(ctx).
Get()
if err != nil {
if statusErr, ok := err.(*apierrors.StatusError); ok {
log.Error(err, "List operation error", "code", statusErr.Status().Code)
} else {
log.Error(err, "List operation error")
}
return nil, err
}
// Strip down the the list before passing the paginated response back to
// the pager function
convertedList, err := converter.ConvertList(list)
return convertedList.(runtime.Object), err
}
// We don't need to modify the watcher, we will strip down the k8s object in the ProcessFunc
// before storing the object in the data store.
watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
options.Watch = true
watch, err := restClient.Get().
Namespace(namespace).
Resource(resource).
VersionedParams(&options, metav1.ParameterCodec).
Watch(ctx)
if err != nil {
if statusErr, ok := err.(*apierrors.StatusError); ok {
log.Error(err, "Watch operation error", "code", statusErr.Status().Code)
} else {
log.Error(err, "Watch operation error")
}
return nil, err
}
return watch, err
}
return &cache.ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}
}
func (c *CustomController) worker() {
for c.processNextWorkItem() {
}
}
func (c *CustomController) processNextWorkItem() bool {
obj, shutdown := c.workQueue.Get()
if shutdown {
// Stop working
return false
}
// We call Done here so the workqueue knows we have finished
// processing this item. We also must remember to call Forget if we
// do not want this work item being re-queued. For example, we do
// not call Forget if a transient error occurs, instead the item is
// put back on the workqueue and attempted again after a back-off
// period.
defer c.workQueue.Done(obj)
// The item from the workqueue will be forgotten in the handler, when
// it's successfully processed.
return c.reconcileHandler(obj)
}
func (c *CustomController) reconcileHandler(obj interface{}) bool {
var req Request
var ok bool
if req, ok = obj.(Request); !ok {
// As the item in the workqueue is actually invalid, we call
// Forget here else we'd go into a loop of attempting to
// process a work item that is invalid.
c.workQueue.Forget(obj)
c.log.Error(nil, "Queue item was not a Request",
"type", fmt.Sprintf("%T", obj), "value", obj)
// Return true, don't take a break
return true
}
// RunInformersAndControllers the syncHandler, passing it the namespace/Name string of the
// resource to be synced.
if result, err := c.Do.Reconcile(req); err != nil {
c.workQueue.AddRateLimited(req)
c.log.Error(err, "Reconciler error", "request", req)
return false
} else if result.RequeueAfter > 0 {
// The result.RequeueAfter request will be lost, if it is returned
// along with a non-nil error. But this is intended as
// We need to drive to stable reconcile loops before queuing due
// to result.RequestAfter
c.workQueue.Forget(obj)
c.workQueue.AddAfter(req, result.RequeueAfter)
return true
} else if result.Requeue {
c.workQueue.AddRateLimited(req)
return true
}
// Finally, if no error occurs we Forget this item so it does not
// get queued again until another change happens.
c.workQueue.Forget(obj)
c.log.V(1).Info("Successfully Reconciled", "request", req)
// Return true, don't take a break
return true
}
func (c *CustomController) CustomCheck() healthz.Checker {
return func(req *http.Request) error {
err := rcHealthz.PingWithTimeout(func(status chan<- error) {
var ping interface{}
c.workQueue.NumRequeues(ping)
c.log.V(1).Info("***** health check on custom pod controller tested workQueue NumRequeues *****")
status <- nil
}, c.log)
return err
}
}