pkg/worker/worker.go (58 lines of code) (raw):
// -------------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
// --------------------------------------------------------------------------------------------
package worker
import (
"reflect"
"time"
"k8s.io/klog/v2"
"github.com/Azure/application-gateway-kubernetes-ingress/pkg/events"
)
const sleepOnErrorSeconds = 5
const minTimeBetweenUpdates = 1 * time.Second
func drainChan(ch chan events.Event, defaultEvent events.Event) events.Event {
lastEvent := defaultEvent
klog.V(9).Infof("Draining %d events from work channel", len(ch))
for {
select {
case event := <-ch:
// if there are more event in the queue
// we will skip the reconcile event as we should focus on k8s related events
if event.Type != events.PeriodicReconcile {
lastEvent = event
}
default:
return lastEvent
}
}
}
// Run starts the worker which listens for events in eventChannel; stops when stopChannel is closed.
func (w *Worker) Run(work chan events.Event, stopChannel chan struct{}) {
lastUpdate := time.Now().Add(-1 * time.Second)
klog.V(1).Infoln("Worker started")
for {
select {
case event := <-work:
if shouldProcess, reason := w.ShouldProcess(event); !shouldProcess {
if reason != nil {
// This log statement could potentially generate a large amount of log lines and most could be
// innocuous - for instance: "endpoint default/aad-pod-identity-mic is not used by any Ingress"
klog.V(3).Infof("Skipping event. Reason: %s", *reason)
}
continue
}
if event.Value != nil {
// get name, namespace and kind from event.Value
name := reflect.ValueOf(event.Value).Elem().FieldByName("Name").String()
namespace := reflect.ValueOf(event.Value).Elem().FieldByName("Namespace").String()
objectType := reflect.TypeOf(event.Value).Elem()
klog.V(3).Infof("Processing k8s event of type:%s object:%s/%s/%s", event.Type, objectType, namespace, name)
}
since := time.Since(lastUpdate)
if since < minTimeBetweenUpdates {
sleep := minTimeBetweenUpdates - since
klog.V(9).Infof("[worker] It has been %+v since last update; Sleeping for %+v before next update", since, sleep)
time.Sleep(sleep)
}
_ = drainChan(work, event)
if err := w.ProcessEvent(event); err != nil {
klog.Error("Error processing event.", err)
time.Sleep(sleepOnErrorSeconds * time.Second)
}
lastUpdate = time.Now()
case <-stopChannel:
break
}
}
}