in pkg/dispatcher/dispatcher.go [179:211]
func Start() {
log.Log(log.ShimDispatcher).Info("starting the dispatcher")
if getDispatcher().isRunning() {
log.Log(log.ShimDispatcher).Info("dispatcher is already running")
return
}
getDispatcher().stopChan = make(chan struct{})
go func() {
for {
select {
case event := <-getDispatcher().eventChan:
switch v := event.(type) {
case events.TaskEvent:
getEventHandler(EventTypeTask)(v)
case events.ApplicationEvent:
getEventHandler(EventTypeApp)(v)
case events.SchedulerNodeEvent:
getEventHandler(EventTypeNode)(v)
case events.SchedulerEvent:
getEventHandler(EventTypeScheduler)(v)
default:
log.Log(log.ShimDispatcher).Fatal("unsupported event",
zap.Any("event", v))
}
case <-getDispatcher().stopChan:
log.Log(log.ShimDispatcher).Info("shutting down event channel")
getDispatcher().setRunning(false)
return
}
}
}()
getDispatcher().setRunning(true)
}