func()

in pkg/dispatcher/dispatcher.go [141:168]


func (p *Dispatcher) asyncDispatch(event events.SchedulingEvent) {
	count := atomic.AddInt32(&asyncDispatchCount, 1)
	log.Log(log.ShimDispatcher).Warn("event channel is full, transition to async-dispatch mode",
		zap.Int32("asyncDispatchCount", count))
	if count > AsyncDispatchLimit {
		panic(fmt.Errorf("dispatcher exceeds async-dispatch limit"))
	}
	go func(beginTime time.Time, stop chan struct{}) {
		defer atomic.AddInt32(&asyncDispatchCount, -1)
		for p.isRunning() {
			select {
			case <-stop:
				return
			case p.eventChan <- event:
				return
			case <-time.After(AsyncDispatchCheckInterval):
				elapseTime := time.Since(beginTime)
				if elapseTime >= DispatchTimeout {
					log.Log(log.ShimDispatcher).Error("dispatch timeout",
						zap.Float64("elapseSeconds", elapseTime.Seconds()))
					return
				}
				log.Log(log.ShimDispatcher).Warn("event channel is full, keep waiting...",
					zap.Float64("elapseSeconds", elapseTime.Seconds()))
			}
		}
	}(time.Now(), p.stopChan)
}