func()

in src/go/pkg/edasim/orchestrator.go [125:188]


func (o *Orchestrator) JobDispatcher(syncWaitGroup *sync.WaitGroup) {
	log.Debug.Printf("[JobDispatcher\n")
	defer syncWaitGroup.Done()
	defer log.Debug.Printf("JobDispatcher]")

	readyWorkerCount := int32(0)

	statsChannel := GetStatsChannel(o.Context)

	for {
		done := false
		for !done {
			select {
			case <-o.Context.Done():
				return
			case <-o.ReadyCh:
				readyWorkerCount++
			default:
				done = true
			}
		}
		if readyWorkerCount == 0 {
			// no workers, wait 1ms
			time.Sleep(sleepTimeNoWorkers)
			continue
		}

		// dequeue the messages, with no more than ready workers
		dequeue, err := o.JobStartQueue.Dequeue(readyWorkerCount, visibilityTimeout)
		if err != nil {
			log.Error.Printf("error dequeuing %d messages from ready queue: %v", readyWorkerCount, err)
			statsChannel.Error()
			continue
		}

		if dequeue.NumMessages() != 0 {
			now := time.Now()
			for m := int32(0); m < dequeue.NumMessages(); m++ {
				msg := dequeue.Message(m)
				if now.After(msg.NextVisibleTime) {
					log.Error.Printf("%v is after, ignoring", msg)
					continue
				}
				o.MsgCh <- msg
				statsChannel.JobProcessed()
				readyWorkerCount--
			}
		} else {
			// otherwise sleep 10 seconds
			log.Info.Printf("Dispatcher: no messages, sleeping, %d ready workers", readyWorkerCount)
			ticker := time.NewTicker(sleepTimeNoQueueMessagesTick)
			start := time.Now()
			for time.Since(start) < sleepTimeNoQueueMessages {
				select {
				case <-o.Context.Done():
					return
				case <-ticker.C:
				}
			}
			ticker.Stop()
			log.Info.Printf("Dispatcher: awake")
		}
	}
}