func()

in src/go/pkg/cachewarmer/worker.go [33:84]


func (w *Worker) RunWorkerManager(ctx context.Context, syncWaitGroup *sync.WaitGroup) {
	defer syncWaitGroup.Done()
	log.Debug.Printf("[Worker.RunWorkerManager")
	defer log.Debug.Printf("Worker.RunWorkerManager]")

	// initialize random generator
	rand.Seed(time.Now().Unix())

	lastJobCheckTime := time.Now().Add(-timeBetweenWorkerJobCheck)
	ticker := time.NewTicker(tick)
	defer ticker.Stop()

	workerCount := WorkerMultiplier * runtime.NumCPU()
	log.Info.Printf("starting %d orchestrator goroutines", workerCount)
	for i := 0; i < workerCount; i++ {
		syncWaitGroup.Add(1)
		go w.worker(ctx, syncWaitGroup)
	}

	workAvailable := false
	// run the infinite loop
	for {
		select {
		case <-ctx.Done():
			return
		case <-ticker.C:
			if time.Since(lastJobCheckTime) > timeBetweenWorkerJobCheck || workAvailable {
				lastJobCheckTime = time.Now()
				workItemCount := w.workQueue.WorkItemCount()

				if workItemCount < MinimumJobsBeforeRefill {
					workerJob, err := w.Queues.GetWorkerJob()
					if err != nil {
						log.Error.Printf("error checking worker job queue: %v", err)
						continue
					}
					if workerJob == nil {
						continue
					}
					if err := w.processWorkerJob(ctx, workerJob); err != nil {
						log.Error.Printf("error processing worker job: %v", err)
						continue
					}
					if err := w.Queues.DeleteWorkerJob(workerJob); err != nil {
						log.Error.Printf("error deleting worker job: %v", err)
						continue
					}
				}
			}
		}
	}
}