func()

in pkg/k8s/custom_controller.go [103:158]


func (c *CustomController) StartController(stopChanel <-chan struct{}) {
	config := &cache.Config{
		Queue: c.queue,
		ListerWatcher: newListWatcher(c.clientSet.CoreV1().RESTClient(),
			c.converter.Resource(), c.namespace, c.pageLimit, c.converter, c.log),
		ObjectType:       c.converter.ResourceType(),
		FullResyncPeriod: c.resyncPeriod,
		RetryOnError:     c.retryOnError,
		Process: func(obj interface{}) error {
			// from oldest to newest
			for _, d := range obj.(cache.Deltas) {
				// Strip down the pod object and keep only the required details
				convertedObj, err := c.converter.ConvertObject(d.Object)
				if err != nil {
					return err
				}
				switch d.Type {
				case cache.Sync, cache.Added, cache.Updated:
					c.log.V(1).Info("Received Cache event", "event type", d.Type)
					if old, exists, err := c.dataStore.Get(convertedObj); err == nil && exists {
						c.log.V(1).Info("Update event", "pod Ip", convertedObj.(*v1.Pod).Status.PodIP)
						if err := c.dataStore.Update(convertedObj); err != nil {
							return err
						}
						if err := c.notifyChannelOnUpdate(old, convertedObj); err != nil {
							return err
						}
					} else if err == nil && !exists {
						c.log.V(1).Info("Add/Create event", "pod Ip", convertedObj.(*v1.Pod).Status.PodIP)
						if err := c.dataStore.Add(convertedObj); err != nil {
							return err
						}
						if err := c.notifyChannelOnCreate(convertedObj); err != nil {
							return err
						}
					} else {
						return err
					}
				case cache.Deleted:
					c.log.V(1).Info("Delete event", "pod Ip", convertedObj.(*v1.Pod).Status.PodIP)
					if err := c.dataStore.Delete(convertedObj); err != nil {
						return err
					}
					if err := c.notifyChannelOnDelete(convertedObj); err != nil {
						return err
					}
				}
			}
			return nil
		},
	}
	c.controller = cache.New(config)

	// Run the controller
	c.controller.Run(stopChanel)
}