in src/go/pkg/edasim/orchestrator.go [125:188]
func (o *Orchestrator) JobDispatcher(syncWaitGroup *sync.WaitGroup) {
log.Debug.Printf("[JobDispatcher\n")
defer syncWaitGroup.Done()
defer log.Debug.Printf("JobDispatcher]")
readyWorkerCount := int32(0)
statsChannel := GetStatsChannel(o.Context)
for {
done := false
for !done {
select {
case <-o.Context.Done():
return
case <-o.ReadyCh:
readyWorkerCount++
default:
done = true
}
}
if readyWorkerCount == 0 {
// no workers, wait 1ms
time.Sleep(sleepTimeNoWorkers)
continue
}
// dequeue the messages, with no more than ready workers
dequeue, err := o.JobStartQueue.Dequeue(readyWorkerCount, visibilityTimeout)
if err != nil {
log.Error.Printf("error dequeuing %d messages from ready queue: %v", readyWorkerCount, err)
statsChannel.Error()
continue
}
if dequeue.NumMessages() != 0 {
now := time.Now()
for m := int32(0); m < dequeue.NumMessages(); m++ {
msg := dequeue.Message(m)
if now.After(msg.NextVisibleTime) {
log.Error.Printf("%v is after, ignoring", msg)
continue
}
o.MsgCh <- msg
statsChannel.JobProcessed()
readyWorkerCount--
}
} else {
// otherwise sleep 10 seconds
log.Info.Printf("Dispatcher: no messages, sleeping, %d ready workers", readyWorkerCount)
ticker := time.NewTicker(sleepTimeNoQueueMessagesTick)
start := time.Now()
for time.Since(start) < sleepTimeNoQueueMessages {
select {
case <-o.Context.Done():
return
case <-ticker.C:
}
}
ticker.Stop()
log.Info.Printf("Dispatcher: awake")
}
}
}