func Start()

in pkg/dispatcher/dispatcher.go [208:240]


func Start() {
	log.Log(log.ShimDispatcher).Info("starting the dispatcher")
	if getDispatcher().isRunning() {
		log.Log(log.ShimDispatcher).Info("dispatcher is already running")
		return
	}
	getDispatcher().stopChan = make(chan struct{})
	getDispatcher().stopped.Add(1)
	go func() {
		for {
			select {
			case event := <-getDispatcher().eventChan:
				switch v := event.(type) {
				case events.TaskEvent:
					getEventHandler(EventTypeTask)(v)
				case events.ApplicationEvent:
					getEventHandler(EventTypeApp)(v)
				case events.SchedulerNodeEvent:
					getEventHandler(EventTypeNode)(v)
				default:
					log.Log(log.ShimDispatcher).Fatal("unsupported event",
						zap.Any("event", v))
				}
			case <-getDispatcher().stopChan:
				log.Log(log.ShimDispatcher).Info("shutting down event channel")
				getDispatcher().setRunning(false)
				getDispatcher().stopped.Done()
				return
			}
		}
	}()
	getDispatcher().setRunning(true)
}