in pkg/dispatcher/dispatcher.go [170:197]
func (p *Dispatcher) asyncDispatch(event events.SchedulingEvent) {
count := asyncDispatchCount.Add(1)
log.Log(log.ShimDispatcher).Warn("event channel is full, transition to async-dispatch mode",
zap.Int32("asyncDispatchCount", count))
if count > AsyncDispatchLimit {
panic(fmt.Errorf("dispatcher exceeds async-dispatch limit"))
}
go func(beginTime time.Time, stop chan struct{}) {
defer asyncDispatchCount.Add(-1)
for p.isRunning() {
select {
case <-stop:
return
case p.eventChan <- event:
return
case <-time.After(AsyncDispatchCheckInterval):
elapseTime := time.Since(beginTime)
if elapseTime >= DispatchTimeout {
log.Log(log.ShimDispatcher).Error("dispatch timeout",
zap.Float64("elapseSeconds", elapseTime.Seconds()))
return
}
log.Log(log.ShimDispatcher).Warn("event channel is full, keep waiting...",
zap.Float64("elapseSeconds", elapseTime.Seconds()))
}
}
}(time.Now(), p.stopChan)
}