func main()

in cmd/resmgr/main.go [267:541]


func main() {
	app.Version(version)
	app.HelpFlag.Short('h')
	kingpin.MustParse(app.Parse(os.Args[1:]))

	log.SetFormatter(
		&logging.LogFieldFormatter{
			Formatter: &log.JSONFormatter{},
			Fields: log.Fields{
				common.AppLogField: app.Name,
			},
		},
	)

	initialLevel := log.InfoLevel
	if *debug {
		initialLevel = log.DebugLevel
	}
	log.SetLevel(initialLevel)

	cfg := getConfig(*cfgFiles...)

	log.WithField("config", cfg).
		Info("Completed Resource Manager config")

	rootScope, scopeCloser, mux := metrics.InitMetricScope(
		&cfg.Metrics,
		common.PelotonResourceManager,
		metrics.TallyFlushInterval,
	)
	defer scopeCloser.Close()
	rootScope.Counter("boot").Inc(1)

	mux.HandleFunc(logging.LevelOverwrite, logging.LevelOverwriteHandler(initialLevel))
	mux.HandleFunc(buildversion.Get, buildversion.Handler(version))

	store := stores.MustCreateStore(&cfg.Storage, rootScope)
	ormStore, ormErr := ormobjects.NewCassandraStore(
		cassandra.ToOrmConfig(&cfg.Storage.Cassandra),
		rootScope)
	if ormErr != nil {
		log.WithError(ormErr).Fatal("Failed to create ORM store for Cassandra")
	}
	respoolOps := ormobjects.NewResPoolOps(ormStore)
	activeJobsOps := ormobjects.NewActiveJobsOps(ormStore)

	// Create both HTTP and GRPC inbounds
	inbounds := rpc.NewInbounds(
		cfg.ResManager.HTTPPort,
		cfg.ResManager.GRPCPort,
		mux,
	)

	// all leader discovery metrics share a scope (and will be tagged
	// with role={role})
	discoveryScope := rootScope.SubScope("discovery")
	// setup the discovery service to detect hostmgr leaders and
	// configure the YARPC Peer dynamically
	t := rpc.NewTransport()
	hostmgrPeerChooser, err := peer.NewSmartChooser(
		cfg.Election,
		discoveryScope,
		common.HostManagerRole,
		t,
	)
	if err != nil {
		log.
			WithError(err).
			WithField("role", common.HostManagerRole).
			Fatal("Could not create smart peer chooser")
	}
	defer hostmgrPeerChooser.Stop()

	hostmgrOutbound := t.NewOutbound(hostmgrPeerChooser)

	outbounds := yarpc.Outbounds{
		common.PelotonHostManager: transport.Outbounds{
			Unary: hostmgrOutbound,
		},
	}

	securityManager, err := auth_impl.CreateNewSecurityManager(&cfg.Auth)
	if err != nil {
		log.WithError(err).
			Fatal("Could not enable security feature")
	}

	authInboundMiddleware := inbound.NewAuthInboundMiddleware(securityManager)
	yarpcMetricsMiddleware := &inbound.YAPRCMetricsInboundMiddleware{Scope: rootScope.SubScope("yarpc")}

	securityClient, err := auth_impl.CreateNewSecurityClient(&cfg.Auth)
	if err != nil {
		log.WithError(err).
			Fatal("Could not establish secure inter-component communication")
	}

	authOutboundMiddleware := outbound.NewAuthOutboundMiddleware(securityClient)
	leaderCheckMiddleware := &inbound.LeaderCheckInboundMiddleware{}

	dispatcher := yarpc.NewDispatcher(yarpc.Config{
		Name:      common.PelotonResourceManager,
		Inbounds:  inbounds,
		Outbounds: outbounds,
		Metrics: yarpc.MetricsConfig{
			Tally: rootScope,
		},
		InboundMiddleware: yarpc.InboundMiddleware{
			Unary:  yarpc.UnaryInboundMiddleware(authInboundMiddleware, leaderCheckMiddleware, yarpcMetricsMiddleware),
			Oneway: yarpc.OnewayInboundMiddleware(authInboundMiddleware, leaderCheckMiddleware, yarpcMetricsMiddleware),
			Stream: yarpc.StreamInboundMiddleware(authInboundMiddleware, leaderCheckMiddleware, yarpcMetricsMiddleware),
		},
		OutboundMiddleware: yarpc.OutboundMiddleware{
			Unary:  authOutboundMiddleware,
			Oneway: authOutboundMiddleware,
			Stream: authOutboundMiddleware,
		},
	})

	hostmgrClient := hostsvc.NewInternalHostServiceYARPCClient(
		dispatcher.ClientConfig(
			common.PelotonHostManager),
	)

	hostServiceClient := pb_hostsvc.NewHostServiceYARPCClient(
		dispatcher.ClientConfig(
			common.PelotonHostManager),
	)

	// Initializing Resource Pool Tree.
	tree := respool.NewTree(
		rootScope,
		respoolOps,
		store, // store implements JobStore
		store, // store implements TaskStore
		*cfg.ResManager.PreemptionConfig)

	// Initialize resource pool service handlers
	respoolsvc.InitServiceHandler(
		dispatcher,
		rootScope,
		tree,
		ormobjects.NewResPoolOps(ormStore),
	)

	// Initializing the rmtasks in-memory tracker
	task.InitTaskTracker(
		rootScope,
		cfg.ResManager.RmTaskConfig,
	)

	// Initializing the task scheduler
	task.InitScheduler(
		rootScope,
		tree,
		cfg.ResManager.TaskSchedulingPeriod,
		task.GetTracker(),
	)

	// Initializing the entitlement calculator
	calculator := entitlement.NewCalculator(
		cfg.ResManager.EntitlementCaculationPeriod,
		rootScope,
		dispatcher,
		tree,
		cfg.ResManager.HostManagerAPIVersion,
		cfg.ResManager.UseHostPool,
	)

	// Initializing the task reconciler
	reconciler := task.NewReconciler(
		task.GetTracker(),
		store, // store implements TaskStore
		rootScope,
		cfg.ResManager.TaskReconciliationPeriod,
	)

	// Initializing the task preemptor
	preemptor := preemption.NewPreemptor(
		rootScope,
		cfg.ResManager.PreemptionConfig,
		task.GetTracker(),
		tree,
	)

	// Initializing the host drainer
	drainer := maintenance.NewDrainer(
		rootScope,
		hostmgrClient,
		cfg.ResManager.HostDrainerPeriod,
		task.GetTracker(),
		preemptor)

	// Initializing the batch scorer
	batchScorer := hostmover.NewBatchScorer(
		cfg.ResManager.EnableHostScorer,
		hostServiceClient)

	// Initialize resource manager service handlers
	serviceHandler := resmgr.NewServiceHandler(
		dispatcher,
		rootScope,
		task.GetTracker(),
		batchScorer,
		tree,
		preemptor,
		hostmgrClient,
		cfg.ResManager,
	)

	// Initialize recovery
	recoveryHandler := resmgr.NewRecovery(
		rootScope,
		store, // store implements TaskStore
		activeJobsOps,
		ormobjects.NewJobConfigOps(ormStore),
		ormobjects.NewJobRuntimeOps(ormStore),
		serviceHandler,
		tree,
		cfg.ResManager,
		hostmgrClient,
	)

	// Initialize the server
	server := resmgr.NewServer(rootScope,
		cfg.ResManager.HTTPPort,
		cfg.ResManager.GRPCPort,
		tree,
		recoveryHandler,
		calculator,
		reconciler,
		preemptor,
		drainer,
		batchScorer,
	)
	// Set nomination for leader check middleware
	leaderCheckMiddleware.SetNomination(server)

	candidate, err := leader.NewCandidate(
		cfg.Election,
		rootScope,
		common.ResourceManagerRole,
		server,
	)

	if err != nil {
		log.Fatalf("Unable to create leader candidate: %v", err)
	}

	if err = candidate.Start(); err != nil {
		log.Fatalf("Unable to start leader candidate: %v", err)
	}
	defer candidate.Stop()

	// Start dispatch loop
	if err := dispatcher.Start(); err != nil {
		log.Fatalf("Unable to start rpc server: %v", err)
	}
	defer dispatcher.Stop()

	log.WithFields(log.Fields{
		"http_port": cfg.ResManager.HTTPPort,
		"grpc_port": cfg.ResManager.GRPCPort,
	}).Info("Started resource manager")

	// we can *honestly* say the server is booted up now
	health.InitHeartbeat(rootScope, cfg.Health, candidate)

	// start collecting runtime metrics
	defer metrics.StartCollectingRuntimeMetrics(
		rootScope,
		cfg.Metrics.RuntimeMetrics.Enabled,
		cfg.Metrics.RuntimeMetrics.CollectInterval)()

	select {}
}