in pkg/hostmgr/offer/handler.go [134:257]
func InitEventHandler(
d *yarpc.Dispatcher,
parent tally.Scope,
schedulerClient mpb.SchedulerClient,
resmgrClient resmgrsvc.ResourceManagerServiceYARPCClient,
backgroundMgr background.Manager,
ranker binpacking.Ranker,
hostMgrConfig config.Config,
processor watchevent.WatchProcessor,
hostPoolManager manager.HostPoolManager,
mesosPlugin *mesosplugins.MesosManager,
) {
if handler != nil {
log.Warning("Offer event handler has already been initialized")
return
}
metrics := offerpool.NewMetrics(parent)
pool := offerpool.NewOfferPool(
time.Duration(hostMgrConfig.OfferHoldTimeSec)*time.Second,
schedulerClient,
metrics,
hostmgr_mesos.GetSchedulerDriver(),
hostMgrConfig.ScarceResourceTypes,
hostMgrConfig.SlackResourceTypes,
ranker,
hostMgrConfig.HostPlacingOfferStatusTimeout,
processor,
hostPoolManager,
)
placingHostPruner := prune.NewPlacingHostPruner(
pool,
parent.SubScope(_placingHostPrunerName),
)
backgroundMgr.RegisterWorks(
background.Work{
Name: _placingHostPrunerName,
Func: placingHostPruner.Prune,
Period: hostMgrConfig.HostPruningPeriodSec,
},
)
heldHostPruner := prune.NewHeldHostPruner(
pool,
parent.SubScope(_heldHostPrunerName),
)
backgroundMgr.RegisterWorks(
background.Work{
Name: _heldHostPrunerName,
Func: heldHostPruner.Prune,
Period: hostMgrConfig.HeldHostPruningPeriodSec,
},
)
binPackingRefresher := offerpool.NewRefresher(
pool,
)
backgroundMgr.RegisterWorks(
background.Work{
Name: _binPackingRefresherName,
Func: binPackingRefresher.Refresh,
Period: hostMgrConfig.BinPackingRefreshIntervalSec,
},
)
backgroundMgr.RegisterWorks(
background.Work{
Name: _poolMetricsRefresh,
Func: func(_ *uatomic.Bool) {
pool.RefreshGaugeMaps()
},
Period: _poolMetricsRefreshPeriod,
},
)
//TODO: refactor OfferPruner as a background worker
handler = &eventHandler{
schedulerclient: schedulerClient,
watchProcessor: processor,
offerPool: pool,
offerPruner: NewOfferPruner(
pool,
time.Duration(hostMgrConfig.OfferPruningPeriodSec)*time.Second,
metrics),
metrics: NewMetrics(parent),
ackChannel: make(chan *mesos.TaskStatus, hostMgrConfig.TaskUpdateBufferSize),
updateAckConcurrency: hostMgrConfig.TaskUpdateAckConcurrency,
mesosPlugin: mesosPlugin,
}
handler.eventStreamHandler = initEventStreamHandler(
d,
handler,
hostMgrConfig.TaskUpdateBufferSize,
parent.SubScope("EventStreamHandler"))
initResMgrEventForwarder(
handler.eventStreamHandler,
resmgrClient,
parent.SubScope("ResourceManagerClient"))
handler.startAsyncProcessTaskUpdates()
backgroundMgr.RegisterWorks(
background.Work{
Name: _taskStatusUpdateStreamRefresh,
Func: func(_ *uatomic.Bool) {
handler.UpdateCounters()
},
Period: _poolMetricsRefreshPeriod,
},
)
procedures := map[sched.Event_Type]interface{}{
sched.Event_OFFERS: handler.Offers,
sched.Event_INVERSE_OFFERS: handler.InverseOffers,
sched.Event_RESCIND: handler.Rescind,
sched.Event_RESCIND_INVERSE_OFFER: handler.RescindInverseOffer,
sched.Event_UPDATE: handler.Update,
}
for typ, hdl := range procedures {
name := typ.String()
mpb.Register(d, hostmgr_mesos.ServiceName, mpb.Procedure(name, hdl))
}
}