in controllers/custom/builder.go [93:188]
func (b *Builder) Complete(reconciler Reconciler) (healthz.Checker, error) {
// Loggr is no longer an interface
// The suggestion is using LogSink to do nil check now
if b.log.GetSink() == nil {
return nil, fmt.Errorf("need to set the logger")
}
if b.converter == nil {
return nil, fmt.Errorf("converter not provided, " +
"must use high level controller if conversion not required")
}
if b.clientSet == nil {
return nil, fmt.Errorf("need to set kubernetes clienset")
}
if b.dataStore == nil {
return nil, fmt.Errorf("need datastore to start the controller")
}
b.SetDefaults()
workQueue := workqueue.NewNamedRateLimitingQueue(
workqueue.DefaultControllerRateLimiter(), b.options.Name)
optimizedListWatch := newOptimizedListWatcher(b.ctx, b.clientSet.CoreV1().RESTClient(),
b.converter.Resource(), b.options.Namespace, b.converter, b.log.WithName("listWatcher"))
// Create the config for low level controller with the custom converter
// list and watch
config := &cache.Config{
Queue: cache.NewDeltaFIFO(b.converter.Indexer, b.dataStore),
ListerWatcher: optimizedListWatch,
WatchListPageSize: int64(b.options.PageLimit),
ObjectType: b.converter.ResourceType(),
FullResyncPeriod: b.options.ResyncPeriod,
Process: func(obj interface{}, _ bool) error {
// from oldest to newest
for _, d := range obj.(cache.Deltas) {
// Strip down the pod object and keep only the required details
convertedObj, err := b.converter.ConvertObject(d.Object)
if err != nil {
return err
}
switch d.Type {
case cache.Sync, cache.Added, cache.Updated:
if _, exists, err := b.dataStore.Get(convertedObj); err == nil && exists {
if err := b.dataStore.Update(convertedObj); err != nil {
return err
}
} else {
if err := b.dataStore.Add(convertedObj); err != nil {
return err
}
}
if err != nil {
return err
}
metaObj, ok := convertedObj.(metav1.Object)
if !ok {
return fmt.Errorf("failed to get object meta %v", obj)
}
// Add the namespace/name to the queue so multiple
// duplicate events are processed only once at a time
workQueue.Add(Request{
NamespacedName: types.NamespacedName{
Namespace: metaObj.GetNamespace(),
Name: metaObj.GetName(),
},
})
case cache.Deleted:
if err := b.dataStore.Delete(convertedObj); err != nil {
return err
}
// Add entire object instead of namespace/name as from this
// point onwards the object will no longer be present in cache
workQueue.Add(Request{
DeletedObject: convertedObj,
})
}
}
return nil
},
}
controller := NewCustomController(
b.log,
b.options,
config,
reconciler,
workQueue,
b.conditions,
)
// Adds the controller to the manager's Runnable
return controller.checker, b.mgr.Add(controller)
}