controllers/custom/builder.go (151 lines of code) (raw):
// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"). You may
// not use this file except in compliance with the License. A copy of the
// License is located at
//
// http://aws.amazon.com/apache2.0/
//
// or in the "license" file accompanying this file. This file is distributed
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
// express or implied. See the License for the specific language governing
// permissions and limitations under the License.
package custom
import (
"context"
"fmt"
"time"
"github.com/aws/amazon-vpc-resource-controller-k8s/pkg/condition"
"github.com/go-logr/logr"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/manager"
)
type Builder struct {
// options has the configurable parameters for the custom controller
options Options
// converter which will be used to convert a k8s object into desired
// format before it's stored in the data store
converter Converter
// clientSet is the kubernetes client set
clientSet *kubernetes.Clientset
// dataStore with the converted k8s object, objects being watched by the
// controller must be queried using this datastore
dataStore cache.Indexer
// mgr is the controller runtime manager
mgr manager.Manager
log logr.Logger
ctx context.Context
conditions condition.Conditions
}
func (b *Builder) Named(name string) *Builder {
b.options.Name = name
return b
}
func (b *Builder) UsingConverter(converter Converter) *Builder {
b.converter = converter
return b
}
func (b *Builder) WithClientSet(clientSet *kubernetes.Clientset) *Builder {
b.clientSet = clientSet
return b
}
func (b *Builder) UsingDataStore(dataStore cache.Indexer) *Builder {
b.dataStore = dataStore
return b
}
func (b *Builder) WithLogger(logger logr.Logger) *Builder {
b.log = logger
return b
}
func (b *Builder) Options(options Options) *Builder {
b.options = options
return b
}
func (b *Builder) UsingConditions(conditions condition.Conditions) *Builder {
b.conditions = conditions
return b
}
func NewControllerManagedBy(ctx context.Context, mgr manager.Manager) *Builder {
return &Builder{mgr: mgr,
ctx: ctx}
}
// Complete adds the controller to manager's Runnable. The Controller
// runnable will start when the manager starts
func (b *Builder) Complete(reconciler Reconciler) (healthz.Checker, error) {
// Loggr is no longer an interface
// The suggestion is using LogSink to do nil check now
if b.log.GetSink() == nil {
return nil, fmt.Errorf("need to set the logger")
}
if b.converter == nil {
return nil, fmt.Errorf("converter not provided, " +
"must use high level controller if conversion not required")
}
if b.clientSet == nil {
return nil, fmt.Errorf("need to set kubernetes clienset")
}
if b.dataStore == nil {
return nil, fmt.Errorf("need datastore to start the controller")
}
b.SetDefaults()
workQueue := workqueue.NewNamedRateLimitingQueue(
workqueue.DefaultControllerRateLimiter(), b.options.Name)
optimizedListWatch := newOptimizedListWatcher(b.ctx, b.clientSet.CoreV1().RESTClient(),
b.converter.Resource(), b.options.Namespace, b.converter, b.log.WithName("listWatcher"))
// Create the config for low level controller with the custom converter
// list and watch
config := &cache.Config{
Queue: cache.NewDeltaFIFO(b.converter.Indexer, b.dataStore),
ListerWatcher: optimizedListWatch,
WatchListPageSize: int64(b.options.PageLimit),
ObjectType: b.converter.ResourceType(),
FullResyncPeriod: b.options.ResyncPeriod,
Process: func(obj interface{}, _ bool) error {
// from oldest to newest
for _, d := range obj.(cache.Deltas) {
// Strip down the pod object and keep only the required details
convertedObj, err := b.converter.ConvertObject(d.Object)
if err != nil {
return err
}
switch d.Type {
case cache.Sync, cache.Added, cache.Updated:
if _, exists, err := b.dataStore.Get(convertedObj); err == nil && exists {
if err := b.dataStore.Update(convertedObj); err != nil {
return err
}
} else {
if err := b.dataStore.Add(convertedObj); err != nil {
return err
}
}
if err != nil {
return err
}
metaObj, ok := convertedObj.(metav1.Object)
if !ok {
return fmt.Errorf("failed to get object meta %v", obj)
}
// Add the namespace/name to the queue so multiple
// duplicate events are processed only once at a time
workQueue.Add(Request{
NamespacedName: types.NamespacedName{
Namespace: metaObj.GetNamespace(),
Name: metaObj.GetName(),
},
})
case cache.Deleted:
if err := b.dataStore.Delete(convertedObj); err != nil {
return err
}
// Add entire object instead of namespace/name as from this
// point onwards the object will no longer be present in cache
workQueue.Add(Request{
DeletedObject: convertedObj,
})
}
}
return nil
},
}
controller := NewCustomController(
b.log,
b.options,
config,
reconciler,
workQueue,
b.conditions,
)
// Adds the controller to the manager's Runnable
return controller.checker, b.mgr.Add(controller)
}
// SetDefaults sets the default options for controller
func (b *Builder) SetDefaults() {
if b.options.Name == "" {
b.options.Name = fmt.Sprintf("%s custom controller", b.converter.Resource())
}
if b.options.Namespace == "" {
b.options.Namespace = metav1.NamespaceAll
}
if b.options.MaxConcurrentReconciles == 0 {
b.options.MaxConcurrentReconciles = 1
}
if b.options.PageLimit == 0 {
b.options.PageLimit = 100
}
if b.options.ResyncPeriod == 0 {
b.options.ResyncPeriod = 30 * time.Minute
}
}