func InitEventHandler()

in pkg/hostmgr/offer/handler.go [134:257]


func InitEventHandler(
	d *yarpc.Dispatcher,
	parent tally.Scope,
	schedulerClient mpb.SchedulerClient,
	resmgrClient resmgrsvc.ResourceManagerServiceYARPCClient,
	backgroundMgr background.Manager,
	ranker binpacking.Ranker,
	hostMgrConfig config.Config,
	processor watchevent.WatchProcessor,
	hostPoolManager manager.HostPoolManager,
	mesosPlugin *mesosplugins.MesosManager,
) {

	if handler != nil {
		log.Warning("Offer event handler has already been initialized")
		return
	}

	metrics := offerpool.NewMetrics(parent)
	pool := offerpool.NewOfferPool(
		time.Duration(hostMgrConfig.OfferHoldTimeSec)*time.Second,
		schedulerClient,
		metrics,
		hostmgr_mesos.GetSchedulerDriver(),
		hostMgrConfig.ScarceResourceTypes,
		hostMgrConfig.SlackResourceTypes,
		ranker,
		hostMgrConfig.HostPlacingOfferStatusTimeout,
		processor,
		hostPoolManager,
	)

	placingHostPruner := prune.NewPlacingHostPruner(
		pool,
		parent.SubScope(_placingHostPrunerName),
	)
	backgroundMgr.RegisterWorks(
		background.Work{
			Name:   _placingHostPrunerName,
			Func:   placingHostPruner.Prune,
			Period: hostMgrConfig.HostPruningPeriodSec,
		},
	)

	heldHostPruner := prune.NewHeldHostPruner(
		pool,
		parent.SubScope(_heldHostPrunerName),
	)
	backgroundMgr.RegisterWorks(
		background.Work{
			Name:   _heldHostPrunerName,
			Func:   heldHostPruner.Prune,
			Period: hostMgrConfig.HeldHostPruningPeriodSec,
		},
	)

	binPackingRefresher := offerpool.NewRefresher(
		pool,
	)
	backgroundMgr.RegisterWorks(
		background.Work{
			Name:   _binPackingRefresherName,
			Func:   binPackingRefresher.Refresh,
			Period: hostMgrConfig.BinPackingRefreshIntervalSec,
		},
	)

	backgroundMgr.RegisterWorks(
		background.Work{
			Name: _poolMetricsRefresh,
			Func: func(_ *uatomic.Bool) {
				pool.RefreshGaugeMaps()
			},
			Period: _poolMetricsRefreshPeriod,
		},
	)

	//TODO: refactor OfferPruner as a background worker
	handler = &eventHandler{
		schedulerclient: schedulerClient,
		watchProcessor:  processor,
		offerPool:       pool,
		offerPruner: NewOfferPruner(
			pool,
			time.Duration(hostMgrConfig.OfferPruningPeriodSec)*time.Second,
			metrics),
		metrics:              NewMetrics(parent),
		ackChannel:           make(chan *mesos.TaskStatus, hostMgrConfig.TaskUpdateBufferSize),
		updateAckConcurrency: hostMgrConfig.TaskUpdateAckConcurrency,
		mesosPlugin:          mesosPlugin,
	}
	handler.eventStreamHandler = initEventStreamHandler(
		d,
		handler,
		hostMgrConfig.TaskUpdateBufferSize,
		parent.SubScope("EventStreamHandler"))
	initResMgrEventForwarder(
		handler.eventStreamHandler,
		resmgrClient,
		parent.SubScope("ResourceManagerClient"))
	handler.startAsyncProcessTaskUpdates()
	backgroundMgr.RegisterWorks(
		background.Work{
			Name: _taskStatusUpdateStreamRefresh,
			Func: func(_ *uatomic.Bool) {
				handler.UpdateCounters()
			},
			Period: _poolMetricsRefreshPeriod,
		},
	)

	procedures := map[sched.Event_Type]interface{}{
		sched.Event_OFFERS:                handler.Offers,
		sched.Event_INVERSE_OFFERS:        handler.InverseOffers,
		sched.Event_RESCIND:               handler.Rescind,
		sched.Event_RESCIND_INVERSE_OFFER: handler.RescindInverseOffer,
		sched.Event_UPDATE:                handler.Update,
	}

	for typ, hdl := range procedures {
		name := typ.String()
		mpb.Register(d, hostmgr_mesos.ServiceName, mpb.Procedure(name, hdl))
	}
}