in cmd/resmgr/main.go [267:541]
func main() {
app.Version(version)
app.HelpFlag.Short('h')
kingpin.MustParse(app.Parse(os.Args[1:]))
log.SetFormatter(
&logging.LogFieldFormatter{
Formatter: &log.JSONFormatter{},
Fields: log.Fields{
common.AppLogField: app.Name,
},
},
)
initialLevel := log.InfoLevel
if *debug {
initialLevel = log.DebugLevel
}
log.SetLevel(initialLevel)
cfg := getConfig(*cfgFiles...)
log.WithField("config", cfg).
Info("Completed Resource Manager config")
rootScope, scopeCloser, mux := metrics.InitMetricScope(
&cfg.Metrics,
common.PelotonResourceManager,
metrics.TallyFlushInterval,
)
defer scopeCloser.Close()
rootScope.Counter("boot").Inc(1)
mux.HandleFunc(logging.LevelOverwrite, logging.LevelOverwriteHandler(initialLevel))
mux.HandleFunc(buildversion.Get, buildversion.Handler(version))
store := stores.MustCreateStore(&cfg.Storage, rootScope)
ormStore, ormErr := ormobjects.NewCassandraStore(
cassandra.ToOrmConfig(&cfg.Storage.Cassandra),
rootScope)
if ormErr != nil {
log.WithError(ormErr).Fatal("Failed to create ORM store for Cassandra")
}
respoolOps := ormobjects.NewResPoolOps(ormStore)
activeJobsOps := ormobjects.NewActiveJobsOps(ormStore)
// Create both HTTP and GRPC inbounds
inbounds := rpc.NewInbounds(
cfg.ResManager.HTTPPort,
cfg.ResManager.GRPCPort,
mux,
)
// all leader discovery metrics share a scope (and will be tagged
// with role={role})
discoveryScope := rootScope.SubScope("discovery")
// setup the discovery service to detect hostmgr leaders and
// configure the YARPC Peer dynamically
t := rpc.NewTransport()
hostmgrPeerChooser, err := peer.NewSmartChooser(
cfg.Election,
discoveryScope,
common.HostManagerRole,
t,
)
if err != nil {
log.
WithError(err).
WithField("role", common.HostManagerRole).
Fatal("Could not create smart peer chooser")
}
defer hostmgrPeerChooser.Stop()
hostmgrOutbound := t.NewOutbound(hostmgrPeerChooser)
outbounds := yarpc.Outbounds{
common.PelotonHostManager: transport.Outbounds{
Unary: hostmgrOutbound,
},
}
securityManager, err := auth_impl.CreateNewSecurityManager(&cfg.Auth)
if err != nil {
log.WithError(err).
Fatal("Could not enable security feature")
}
authInboundMiddleware := inbound.NewAuthInboundMiddleware(securityManager)
yarpcMetricsMiddleware := &inbound.YAPRCMetricsInboundMiddleware{Scope: rootScope.SubScope("yarpc")}
securityClient, err := auth_impl.CreateNewSecurityClient(&cfg.Auth)
if err != nil {
log.WithError(err).
Fatal("Could not establish secure inter-component communication")
}
authOutboundMiddleware := outbound.NewAuthOutboundMiddleware(securityClient)
leaderCheckMiddleware := &inbound.LeaderCheckInboundMiddleware{}
dispatcher := yarpc.NewDispatcher(yarpc.Config{
Name: common.PelotonResourceManager,
Inbounds: inbounds,
Outbounds: outbounds,
Metrics: yarpc.MetricsConfig{
Tally: rootScope,
},
InboundMiddleware: yarpc.InboundMiddleware{
Unary: yarpc.UnaryInboundMiddleware(authInboundMiddleware, leaderCheckMiddleware, yarpcMetricsMiddleware),
Oneway: yarpc.OnewayInboundMiddleware(authInboundMiddleware, leaderCheckMiddleware, yarpcMetricsMiddleware),
Stream: yarpc.StreamInboundMiddleware(authInboundMiddleware, leaderCheckMiddleware, yarpcMetricsMiddleware),
},
OutboundMiddleware: yarpc.OutboundMiddleware{
Unary: authOutboundMiddleware,
Oneway: authOutboundMiddleware,
Stream: authOutboundMiddleware,
},
})
hostmgrClient := hostsvc.NewInternalHostServiceYARPCClient(
dispatcher.ClientConfig(
common.PelotonHostManager),
)
hostServiceClient := pb_hostsvc.NewHostServiceYARPCClient(
dispatcher.ClientConfig(
common.PelotonHostManager),
)
// Initializing Resource Pool Tree.
tree := respool.NewTree(
rootScope,
respoolOps,
store, // store implements JobStore
store, // store implements TaskStore
*cfg.ResManager.PreemptionConfig)
// Initialize resource pool service handlers
respoolsvc.InitServiceHandler(
dispatcher,
rootScope,
tree,
ormobjects.NewResPoolOps(ormStore),
)
// Initializing the rmtasks in-memory tracker
task.InitTaskTracker(
rootScope,
cfg.ResManager.RmTaskConfig,
)
// Initializing the task scheduler
task.InitScheduler(
rootScope,
tree,
cfg.ResManager.TaskSchedulingPeriod,
task.GetTracker(),
)
// Initializing the entitlement calculator
calculator := entitlement.NewCalculator(
cfg.ResManager.EntitlementCaculationPeriod,
rootScope,
dispatcher,
tree,
cfg.ResManager.HostManagerAPIVersion,
cfg.ResManager.UseHostPool,
)
// Initializing the task reconciler
reconciler := task.NewReconciler(
task.GetTracker(),
store, // store implements TaskStore
rootScope,
cfg.ResManager.TaskReconciliationPeriod,
)
// Initializing the task preemptor
preemptor := preemption.NewPreemptor(
rootScope,
cfg.ResManager.PreemptionConfig,
task.GetTracker(),
tree,
)
// Initializing the host drainer
drainer := maintenance.NewDrainer(
rootScope,
hostmgrClient,
cfg.ResManager.HostDrainerPeriod,
task.GetTracker(),
preemptor)
// Initializing the batch scorer
batchScorer := hostmover.NewBatchScorer(
cfg.ResManager.EnableHostScorer,
hostServiceClient)
// Initialize resource manager service handlers
serviceHandler := resmgr.NewServiceHandler(
dispatcher,
rootScope,
task.GetTracker(),
batchScorer,
tree,
preemptor,
hostmgrClient,
cfg.ResManager,
)
// Initialize recovery
recoveryHandler := resmgr.NewRecovery(
rootScope,
store, // store implements TaskStore
activeJobsOps,
ormobjects.NewJobConfigOps(ormStore),
ormobjects.NewJobRuntimeOps(ormStore),
serviceHandler,
tree,
cfg.ResManager,
hostmgrClient,
)
// Initialize the server
server := resmgr.NewServer(rootScope,
cfg.ResManager.HTTPPort,
cfg.ResManager.GRPCPort,
tree,
recoveryHandler,
calculator,
reconciler,
preemptor,
drainer,
batchScorer,
)
// Set nomination for leader check middleware
leaderCheckMiddleware.SetNomination(server)
candidate, err := leader.NewCandidate(
cfg.Election,
rootScope,
common.ResourceManagerRole,
server,
)
if err != nil {
log.Fatalf("Unable to create leader candidate: %v", err)
}
if err = candidate.Start(); err != nil {
log.Fatalf("Unable to start leader candidate: %v", err)
}
defer candidate.Stop()
// Start dispatch loop
if err := dispatcher.Start(); err != nil {
log.Fatalf("Unable to start rpc server: %v", err)
}
defer dispatcher.Stop()
log.WithFields(log.Fields{
"http_port": cfg.ResManager.HTTPPort,
"grpc_port": cfg.ResManager.GRPCPort,
}).Info("Started resource manager")
// we can *honestly* say the server is booted up now
health.InitHeartbeat(rootScope, cfg.Health, candidate)
// start collecting runtime metrics
defer metrics.StartCollectingRuntimeMetrics(
rootScope,
cfg.Metrics.RuntimeMetrics.Enabled,
cfg.Metrics.RuntimeMetrics.CollectInterval)()
select {}
}