in src/go/pkg/azure/vmss.go [93:179]
func (v *VmssOperationManager) Run(syncWaitGroup *sync.WaitGroup) {
defer syncWaitGroup.Done()
ticker := time.NewTicker(tick)
defer ticker.Stop()
lastStatssTime := time.Now().Add(-PrintStatsCycle)
// loop around all the operations waiting the appropriate time for each operation
for {
select {
case <-v.Context.Done():
return
case <-ticker.C:
v.mux.Lock()
keys := make([]string, 0, len(v.vmssFutureMap))
for k := range v.vmssFutureMap {
keys = append(keys, k)
}
// print stats
if time.Since(lastStatssTime) >= PrintStatsCycle {
lastStatssTime = time.Now()
log.Info.Printf("OperationManagerStats: Watching %d operations: %v", len(keys), keys)
}
v.mux.Unlock()
// now iterate through the keys
for _, k := range keys {
v.mux.Lock()
op, ok := v.vmssFutureMap[k]
if !ok {
v.mux.Unlock()
continue
}
if time.Since(op.LastQuery) < op.WaitDuration {
v.mux.Unlock()
continue
}
future := op.FutureAPI
v.mux.Unlock()
// make blocking call
log.Info.Printf("get operation call for vmss %s", k)
done, err := future.DoneWithContext(v.Context, v.Client)
v.mux.Lock()
op, ok = v.vmssFutureMap[k]
if !ok {
v.mux.Unlock()
continue
}
if time.Since(op.LastQuery) < op.WaitDuration {
v.mux.Unlock()
continue
}
if done {
// the operation is complete
log.Info.Printf("operation complete for vmss %s with status %v", k, future.Status())
delete(v.vmssFutureMap, k)
} else {
if err == nil {
// check for Retry-After delay, if not present use the client's polling delay
var ok bool
delay, ok := future.GetPollingDelay()
if !ok {
delay = DEFAULT_OPERATION_POLL_TIME
}
op.Attempts = 0
op.WaitDuration = delay
} else {
op.Attempts++
log.Error.Printf("get operation call for vmss %s, %d attempts with error: %v", k, op.Attempts, err)
delay := DEFAULT_OPERATION_POLL_TIME * time.Duration(math.Pow(2, float64(op.Attempts)))
if delay > MAX_OPERATION_POLL_TIME {
delay = MAX_OPERATION_POLL_TIME
}
op.WaitDuration = delay
}
op.LastQuery = time.Now()
}
v.mux.Unlock()
}
}
}
}