func()

in pkg/worker/worker.go [152:190]


func (w *worker) processNextItem() (cont bool) {
	job, quit := w.queue.Get()
	if quit {
		return
	}
	defer w.queue.Done(job)
	log := w.Log.WithValues("job", job)

	cont = true

	if result, err := w.workerFunc(job); err != nil {
		if w.queue.NumRequeues(job) >= w.maxRetriesOnErr {
			log.Error(err, "exceeded maximum retries", "max retries", w.maxRetriesOnErr)
			w.queue.Forget(job)
			jobsFailedCount.WithLabelValues(w.resourceName).Inc()
			return
		} else if apierrors.IsNotFound(err) {
			//similar to upstream https://github.com/kubernetes-sigs/controller-runtime/issues/377#issue-426207628
			log.Error(err, "won't requeue a not found errored job", "job", job)
			w.queue.Forget(job)
			jobsNotFoundCount.WithLabelValues(w.resourceName).Inc()
			return
		}
		log.Error(err, "re-queuing job", "retry count", w.queue.NumRequeues(job))
		w.queue.AddRateLimited(job)
		return
	} else if result.Requeue {
		log.V(1).Info("timed retry", "retry after", result.RequeueAfter)
		w.queue.AddAfter(job, result.RequeueAfter)
		return
	}

	log.V(1).Info("completed job successfully")

	w.queue.Forget(job)
	jobsCompletedCount.WithLabelValues(w.resourceName).Inc()

	return
}