func()

in controllers/custom/builder.go [91:185]


func (b *Builder) Complete(reconciler Reconciler) error {
	if b.log == nil {
		return fmt.Errorf("need to set the logger")
	}
	if b.converter == nil {
		return fmt.Errorf("converter not provided, " +
			"must use high level controller if conversion not required")
	}
	if b.clientSet == nil {
		return fmt.Errorf("need to set kubernetes clienset")
	}
	if b.dataStore == nil {
		return fmt.Errorf("need datastore to start the controller")
	}
	if b.dataStoreSyncFlag == nil {
		return fmt.Errorf("data store sync flag cannot be null")
	}
	b.SetDefaults()

	workQueue := workqueue.NewNamedRateLimitingQueue(
		workqueue.DefaultControllerRateLimiter(), b.options.Name)

	optimizedListWatch := newOptimizedListWatcher(b.ctx, b.clientSet.CoreV1().RESTClient(),
		b.converter.Resource(), b.options.Namespace, b.options.PageLimit, b.converter)

	// Create the config for low level controller with the custom converter
	// list and watch
	config := &cache.Config{
		Queue:            cache.NewDeltaFIFO(b.converter.Indexer, b.dataStore),
		ListerWatcher:    optimizedListWatch,
		ObjectType:       b.converter.ResourceType(),
		FullResyncPeriod: b.options.ResyncPeriod,
		Process: func(obj interface{}) error {
			// from oldest to newest
			for _, d := range obj.(cache.Deltas) {
				// Strip down the pod object and keep only the required details
				convertedObj, err := b.converter.ConvertObject(d.Object)
				if err != nil {
					return err
				}
				switch d.Type {
				case cache.Sync, cache.Added, cache.Updated:
					if _, exists, err := b.dataStore.Get(convertedObj); err == nil && exists {
						if err := b.dataStore.Update(convertedObj); err != nil {
							return err
						}
					} else {
						if err := b.dataStore.Add(convertedObj); err != nil {
							return err
						}
					}
					if err != nil {
						return err
					}
					metaObj, ok := convertedObj.(metav1.Object)
					if !ok {
						return fmt.Errorf("failed to get object meta %v", obj)
					}

					// Add the namespace/name to the queue so multiple
					// duplicate events are processed only once at a time
					workQueue.Add(Request{
						NamespacedName: types.NamespacedName{
							Namespace: metaObj.GetNamespace(),
							Name:      metaObj.GetName(),
						},
					})

				case cache.Deleted:
					if err := b.dataStore.Delete(convertedObj); err != nil {
						return err
					}
					// Add entire object instead of namespace/name as from this
					// point onwards the object will no longer be present in cache
					workQueue.Add(Request{
						DeletedObject: convertedObj,
					})
				}
			}
			return nil
		},
	}

	controller := &CustomController{
		log:       b.log,
		options:   b.options,
		config:    config,
		Do:        reconciler,
		workQueue: workQueue,
		syncFlag:  b.dataStoreSyncFlag,
	}

	// Adds the controller to the manager's Runnable
	return b.mgr.Add(controller)
}