in pkg/worker/worker.go [152:190]
func (w *worker) processNextItem() (cont bool) {
job, quit := w.queue.Get()
if quit {
return
}
defer w.queue.Done(job)
log := w.Log.WithValues("job", job)
cont = true
if result, err := w.workerFunc(job); err != nil {
if w.queue.NumRequeues(job) >= w.maxRetriesOnErr {
log.Error(err, "exceeded maximum retries", "max retries", w.maxRetriesOnErr)
w.queue.Forget(job)
jobsFailedCount.WithLabelValues(w.resourceName).Inc()
return
} else if apierrors.IsNotFound(err) {
//similar to upstream https://github.com/kubernetes-sigs/controller-runtime/issues/377#issue-426207628
log.Error(err, "won't requeue a not found errored job", "job", job)
w.queue.Forget(job)
jobsNotFoundCount.WithLabelValues(w.resourceName).Inc()
return
}
log.Error(err, "re-queuing job", "retry count", w.queue.NumRequeues(job))
w.queue.AddRateLimited(job)
return
} else if result.Requeue {
log.V(1).Info("timed retry", "retry after", result.RequeueAfter)
w.queue.AddAfter(job, result.RequeueAfter)
return
}
log.V(1).Info("completed job successfully")
w.queue.Forget(job)
jobsCompletedCount.WithLabelValues(w.resourceName).Inc()
return
}