func()

in pkg/worker/worker.go [38:81]


func (w *Worker) Run(work chan events.Event, stopChannel chan struct{}) {
	lastUpdate := time.Now().Add(-1 * time.Second)
	klog.V(1).Infoln("Worker started")
	for {
		select {
		case event := <-work:
			if shouldProcess, reason := w.ShouldProcess(event); !shouldProcess {
				if reason != nil {
					// This log statement could potentially generate a large amount of log lines and most could be
					// innocuous - for instance: "endpoint default/aad-pod-identity-mic is not used by any Ingress"
					klog.V(3).Infof("Skipping event. Reason: %s", *reason)
				}
				continue
			}

			if event.Value != nil {
				// get name, namespace and kind from event.Value
				name := reflect.ValueOf(event.Value).Elem().FieldByName("Name").String()
				namespace := reflect.ValueOf(event.Value).Elem().FieldByName("Namespace").String()
				objectType := reflect.TypeOf(event.Value).Elem()

				klog.V(3).Infof("Processing k8s event of type:%s object:%s/%s/%s", event.Type, objectType, namespace, name)
			}

			since := time.Since(lastUpdate)
			if since < minTimeBetweenUpdates {
				sleep := minTimeBetweenUpdates - since
				klog.V(9).Infof("[worker] It has been %+v since last update; Sleeping for %+v before next update", since, sleep)
				time.Sleep(sleep)
			}

			_ = drainChan(work, event)

			if err := w.ProcessEvent(event); err != nil {
				klog.Error("Error processing event.", err)
				time.Sleep(sleepOnErrorSeconds * time.Second)
			}

			lastUpdate = time.Now()
		case <-stopChannel:
			break
		}
	}
}