in pkg/azure/workqueue.go [104:140]
func (w *Worker) DoWork() bool {
lastUpdate := time.Now().Add(-1 * time.Second)
klog.Info("Worker started")
for {
select {
case <-w.Queue.ctx.Done():
klog.Error("Context cancelled ... :", w.Queue.ctx.Err())
return true
// if job received.
case job := <-w.Queue.jobs:
resourceName := job.Request.NamespacedName.Name
jobsInQueue.Set(resourceName, false)
since := time.Since(lastUpdate)
if since < minTimeBetweenUpdates {
sleep := minTimeBetweenUpdates - since
klog.Infof("[worker] It has been %+v since last update; Sleeping for %+v before next update", since, sleep)
time.Sleep(sleep)
}
nodesWithFwTaint := w.drainChan(job)
node := &corev1.Node{}
if err := w.client.Get(job.ctx, job.Request.NamespacedName, node); err == nil {
nodesWithFwTaint = append(nodesWithFwTaint, node)
}
err := job.Run(nodesWithFwTaint)
if err != nil {
klog.Error("Err in DoWork ... :", err)
continue
}
lastUpdate = time.Now()
}
}
}