func()

in src/go/pkg/cachewarmer/warmpathmanager.go [271:323]


func (m *WarmPathManager) RunVMSSManager(ctx context.Context, syncWaitGroup *sync.WaitGroup) {
	log.Debug.Printf("[WarmPathManager.RunVMSSManager")
	defer log.Debug.Printf("WarmPathManager.RunVMSSManager]")
	defer syncWaitGroup.Done()

	lastWorkerJobCheckTime := time.Now().Add(-timeBetweenJobCheck)
	lastReadQueueSuccess := time.Now()
	lastJobSeen := time.Now()
	ticker := time.NewTicker(tick)
	defer ticker.Stop()

	// run the infinite loop
	for {
		select {
		case <-ctx.Done():
			log.Info.Printf("cancelation received")
			return
		case <-ticker.C:
			if time.Since(lastWorkerJobCheckTime) > timeBetweenJobCheck {
				lastWorkerJobCheckTime = time.Now()
				log.Info.Printf("VMSS Manager check if worker jobs exist")
				if isEmpty, err := m.Queues.IsWorkQueueEmpty(); err != nil {
					log.Error.Printf("error checking if work queue was empty: %v", err)
					if time.Since(lastReadQueueSuccess) > failureTimeToDeleteVMSS {
						log.Error.Printf("read worker queue has not been successful for 15 minutes, ensure vmss deleted")
						m.EnsureVmssDeleted(ctx)
						// reset the last read dir success
						lastReadQueueSuccess = time.Now()
						continue
					}
				} else if isEmpty == true {
					// jobs do not exist, delete vmss if not already deleted
					if time.Since(lastJobSeen) > timeToDeleteVMSSAfterNoJobs {
						m.EnsureVmssDeleted(ctx)
					}
				} else {
					// jobs exist
					workerJob, err := m.Queues.PeekWorkerJob()
					if err != nil {
						log.Error.Printf("error peeking at a worker job: %v", err)
						continue
					}
					if workerJob == nil {
						continue
					}
					m.EnsureVmssRunning(ctx)
					lastJobSeen = time.Now()
				}
				lastReadQueueSuccess = time.Now()
			}
		}
	}
}