in src/go/pkg/cachewarmer/worker.go [33:84]
func (w *Worker) RunWorkerManager(ctx context.Context, syncWaitGroup *sync.WaitGroup) {
defer syncWaitGroup.Done()
log.Debug.Printf("[Worker.RunWorkerManager")
defer log.Debug.Printf("Worker.RunWorkerManager]")
// initialize random generator
rand.Seed(time.Now().Unix())
lastJobCheckTime := time.Now().Add(-timeBetweenWorkerJobCheck)
ticker := time.NewTicker(tick)
defer ticker.Stop()
workerCount := WorkerMultiplier * runtime.NumCPU()
log.Info.Printf("starting %d orchestrator goroutines", workerCount)
for i := 0; i < workerCount; i++ {
syncWaitGroup.Add(1)
go w.worker(ctx, syncWaitGroup)
}
workAvailable := false
// run the infinite loop
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if time.Since(lastJobCheckTime) > timeBetweenWorkerJobCheck || workAvailable {
lastJobCheckTime = time.Now()
workItemCount := w.workQueue.WorkItemCount()
if workItemCount < MinimumJobsBeforeRefill {
workerJob, err := w.Queues.GetWorkerJob()
if err != nil {
log.Error.Printf("error checking worker job queue: %v", err)
continue
}
if workerJob == nil {
continue
}
if err := w.processWorkerJob(ctx, workerJob); err != nil {
log.Error.Printf("error processing worker job: %v", err)
continue
}
if err := w.Queues.DeleteWorkerJob(workerJob); err != nil {
log.Error.Printf("error deleting worker job: %v", err)
continue
}
}
}
}
}
}