func()

in controllers/custom/builder.go [93:188]


func (b *Builder) Complete(reconciler Reconciler) (healthz.Checker, error) {
	// Loggr is no longer an interface
	// The suggestion is using LogSink to do nil check now
	if b.log.GetSink() == nil {
		return nil, fmt.Errorf("need to set the logger")
	}
	if b.converter == nil {
		return nil, fmt.Errorf("converter not provided, " +
			"must use high level controller if conversion not required")
	}
	if b.clientSet == nil {
		return nil, fmt.Errorf("need to set kubernetes clienset")
	}
	if b.dataStore == nil {
		return nil, fmt.Errorf("need datastore to start the controller")
	}

	b.SetDefaults()

	workQueue := workqueue.NewNamedRateLimitingQueue(
		workqueue.DefaultControllerRateLimiter(), b.options.Name)

	optimizedListWatch := newOptimizedListWatcher(b.ctx, b.clientSet.CoreV1().RESTClient(),
		b.converter.Resource(), b.options.Namespace, b.converter, b.log.WithName("listWatcher"))

	// Create the config for low level controller with the custom converter
	// list and watch
	config := &cache.Config{
		Queue:             cache.NewDeltaFIFO(b.converter.Indexer, b.dataStore),
		ListerWatcher:     optimizedListWatch,
		WatchListPageSize: int64(b.options.PageLimit),
		ObjectType:        b.converter.ResourceType(),
		FullResyncPeriod:  b.options.ResyncPeriod,
		Process: func(obj interface{}, _ bool) error {
			// from oldest to newest
			for _, d := range obj.(cache.Deltas) {
				// Strip down the pod object and keep only the required details
				convertedObj, err := b.converter.ConvertObject(d.Object)
				if err != nil {
					return err
				}
				switch d.Type {
				case cache.Sync, cache.Added, cache.Updated:
					if _, exists, err := b.dataStore.Get(convertedObj); err == nil && exists {
						if err := b.dataStore.Update(convertedObj); err != nil {
							return err
						}
					} else {
						if err := b.dataStore.Add(convertedObj); err != nil {
							return err
						}
					}
					if err != nil {
						return err
					}
					metaObj, ok := convertedObj.(metav1.Object)
					if !ok {
						return fmt.Errorf("failed to get object meta %v", obj)
					}

					// Add the namespace/name to the queue so multiple
					// duplicate events are processed only once at a time
					workQueue.Add(Request{
						NamespacedName: types.NamespacedName{
							Namespace: metaObj.GetNamespace(),
							Name:      metaObj.GetName(),
						},
					})

				case cache.Deleted:
					if err := b.dataStore.Delete(convertedObj); err != nil {
						return err
					}
					// Add entire object instead of namespace/name as from this
					// point onwards the object will no longer be present in cache
					workQueue.Add(Request{
						DeletedObject: convertedObj,
					})
				}
			}
			return nil
		},
	}

	controller := NewCustomController(
		b.log,
		b.options,
		config,
		reconciler,
		workQueue,
		b.conditions,
	)

	// Adds the controller to the manager's Runnable
	return controller.checker, b.mgr.Add(controller)
}