in src/go/pkg/cachewarmer/warmpathmanager.go [271:323]
func (m *WarmPathManager) RunVMSSManager(ctx context.Context, syncWaitGroup *sync.WaitGroup) {
log.Debug.Printf("[WarmPathManager.RunVMSSManager")
defer log.Debug.Printf("WarmPathManager.RunVMSSManager]")
defer syncWaitGroup.Done()
lastWorkerJobCheckTime := time.Now().Add(-timeBetweenJobCheck)
lastReadQueueSuccess := time.Now()
lastJobSeen := time.Now()
ticker := time.NewTicker(tick)
defer ticker.Stop()
// run the infinite loop
for {
select {
case <-ctx.Done():
log.Info.Printf("cancelation received")
return
case <-ticker.C:
if time.Since(lastWorkerJobCheckTime) > timeBetweenJobCheck {
lastWorkerJobCheckTime = time.Now()
log.Info.Printf("VMSS Manager check if worker jobs exist")
if isEmpty, err := m.Queues.IsWorkQueueEmpty(); err != nil {
log.Error.Printf("error checking if work queue was empty: %v", err)
if time.Since(lastReadQueueSuccess) > failureTimeToDeleteVMSS {
log.Error.Printf("read worker queue has not been successful for 15 minutes, ensure vmss deleted")
m.EnsureVmssDeleted(ctx)
// reset the last read dir success
lastReadQueueSuccess = time.Now()
continue
}
} else if isEmpty == true {
// jobs do not exist, delete vmss if not already deleted
if time.Since(lastJobSeen) > timeToDeleteVMSSAfterNoJobs {
m.EnsureVmssDeleted(ctx)
}
} else {
// jobs exist
workerJob, err := m.Queues.PeekWorkerJob()
if err != nil {
log.Error.Printf("error peeking at a worker job: %v", err)
continue
}
if workerJob == nil {
continue
}
m.EnsureVmssRunning(ctx)
lastJobSeen = time.Now()
}
lastReadQueueSuccess = time.Now()
}
}
}
}