in pkg/worker/worker.go [38:81]
func (w *Worker) Run(work chan events.Event, stopChannel chan struct{}) {
lastUpdate := time.Now().Add(-1 * time.Second)
klog.V(1).Infoln("Worker started")
for {
select {
case event := <-work:
if shouldProcess, reason := w.ShouldProcess(event); !shouldProcess {
if reason != nil {
// This log statement could potentially generate a large amount of log lines and most could be
// innocuous - for instance: "endpoint default/aad-pod-identity-mic is not used by any Ingress"
klog.V(3).Infof("Skipping event. Reason: %s", *reason)
}
continue
}
if event.Value != nil {
// get name, namespace and kind from event.Value
name := reflect.ValueOf(event.Value).Elem().FieldByName("Name").String()
namespace := reflect.ValueOf(event.Value).Elem().FieldByName("Namespace").String()
objectType := reflect.TypeOf(event.Value).Elem()
klog.V(3).Infof("Processing k8s event of type:%s object:%s/%s/%s", event.Type, objectType, namespace, name)
}
since := time.Since(lastUpdate)
if since < minTimeBetweenUpdates {
sleep := minTimeBetweenUpdates - since
klog.V(9).Infof("[worker] It has been %+v since last update; Sleeping for %+v before next update", since, sleep)
time.Sleep(sleep)
}
_ = drainChan(work, event)
if err := w.ProcessEvent(event); err != nil {
klog.Error("Error processing event.", err)
time.Sleep(sleepOnErrorSeconds * time.Second)
}
lastUpdate = time.Now()
case <-stopChannel:
break
}
}
}