in pkg/k8s/custom_controller.go [103:158]
func (c *CustomController) StartController(stopChanel <-chan struct{}) {
config := &cache.Config{
Queue: c.queue,
ListerWatcher: newListWatcher(c.clientSet.CoreV1().RESTClient(),
c.converter.Resource(), c.namespace, c.pageLimit, c.converter, c.log),
ObjectType: c.converter.ResourceType(),
FullResyncPeriod: c.resyncPeriod,
RetryOnError: c.retryOnError,
Process: func(obj interface{}) error {
// from oldest to newest
for _, d := range obj.(cache.Deltas) {
// Strip down the pod object and keep only the required details
convertedObj, err := c.converter.ConvertObject(d.Object)
if err != nil {
return err
}
switch d.Type {
case cache.Sync, cache.Added, cache.Updated:
c.log.V(1).Info("Received Cache event", "event type", d.Type)
if old, exists, err := c.dataStore.Get(convertedObj); err == nil && exists {
c.log.V(1).Info("Update event", "pod Ip", convertedObj.(*v1.Pod).Status.PodIP)
if err := c.dataStore.Update(convertedObj); err != nil {
return err
}
if err := c.notifyChannelOnUpdate(old, convertedObj); err != nil {
return err
}
} else if err == nil && !exists {
c.log.V(1).Info("Add/Create event", "pod Ip", convertedObj.(*v1.Pod).Status.PodIP)
if err := c.dataStore.Add(convertedObj); err != nil {
return err
}
if err := c.notifyChannelOnCreate(convertedObj); err != nil {
return err
}
} else {
return err
}
case cache.Deleted:
c.log.V(1).Info("Delete event", "pod Ip", convertedObj.(*v1.Pod).Status.PodIP)
if err := c.dataStore.Delete(convertedObj); err != nil {
return err
}
if err := c.notifyChannelOnDelete(convertedObj); err != nil {
return err
}
}
}
return nil
},
}
c.controller = cache.New(config)
// Run the controller
c.controller.Run(stopChanel)
}