func main()

in cmd/placement/main.go [185:470]


func main() {
	app.Version(version)
	app.HelpFlag.Short('h')
	kingpin.MustParse(app.Parse(os.Args[1:]))

	log.SetFormatter(
		&logging.LogFieldFormatter{
			Formatter: &log.JSONFormatter{},
			Fields: log.Fields{
				common.AppLogField: app.Name,
			},
		},
	)

	initialLevel := log.InfoLevel
	if *debug {
		initialLevel = log.DebugLevel
	}
	log.SetLevel(initialLevel)

	log.WithField("files", *cfgFiles).
		Info("Loading Placement Engnine config")
	var cfg config.Config
	if err := common_config.Parse(&cfg, *cfgFiles...); err != nil {
		log.WithField("error", err).Fatal("Cannot parse yaml config")
	}

	if *enableSentry {
		logging.ConfigureSentry(&cfg.SentryConfig)
	}

	// now, override any CLI flags in the loaded config.Config
	if *zkPath != "" {
		cfg.Mesos.ZkPath = *zkPath
	}

	if len(*electionZkServers) > 0 {
		cfg.Election.ZKServers = *electionZkServers
	}

	if !*useCassandra {
		cfg.Storage.UseCassandra = false
	}

	if *cassandraHosts != nil && len(*cassandraHosts) > 0 {
		cfg.Storage.Cassandra.CassandraConn.ContactPoints = *cassandraHosts
	}

	if *cassandraStore != "" {
		cfg.Storage.Cassandra.StoreName = *cassandraStore
	}

	if *httpPort != 0 {
		cfg.Placement.HTTPPort = *httpPort
	}

	if *grpcPort != 0 {
		cfg.Placement.GRPCPort = *grpcPort
	}

	if *datacenter != "" {
		cfg.Storage.Cassandra.CassandraConn.DataCenter = *datacenter
	}

	if *cassandraPort != 0 {
		cfg.Storage.Cassandra.CassandraConn.Port = *cassandraPort
	}

	if *taskType != "" {
		overridePlacementStrategy(*taskType, &cfg)
	}

	if *taskDequeueLimit != 0 {
		cfg.Placement.TaskDequeueLimit = *taskDequeueLimit
	}

	if *taskDequeuePeriod != 0 {
		cfg.Placement.TaskDequeuePeriod = time.Duration(*taskDequeuePeriod) * time.Second
	}

	if *hostMgrAPIVersionStr != "" {
		hostMgrAPIVersion, err := api.ParseVersion(*hostMgrAPIVersionStr)
		if err != nil {
			log.WithError(err).Fatal("Failed to parse hostmgr-api-version")
		}
		cfg.Placement.HostManagerAPIVersion = hostMgrAPIVersion
	}
	if cfg.Placement.HostManagerAPIVersion == "" {
		cfg.Placement.HostManagerAPIVersion = api.V0
	}

	if *useHostPool {
		log.Info("Use Host Pool for placement")
		cfg.Placement.UseHostPool = true
	}

	// Parse and setup peloton auth
	if len(*authType) != 0 {
		cfg.Auth.AuthType = auth.Type(*authType)
		cfg.Auth.Path = *authConfigFile
	}

	if cfg.Placement.HostManagerAPIVersion == "" {
		cfg.Placement.HostManagerAPIVersion = api.V0
	}

	log.WithField("placement_task_type", cfg.Placement.TaskType).
		WithField("strategy", cfg.Placement.Strategy).
		Info("Placement engine type")

	log.WithField("config", cfg).
		Info("Completed Loading Placement Engine config")

	rootScope, scopeCloser, mux := metrics.InitMetricScope(
		&cfg.Metrics,
		common.PelotonPlacement,
		metrics.TallyFlushInterval,
	)
	defer scopeCloser.Close()

	mux.HandleFunc(logging.LevelOverwrite, logging.LevelOverwriteHandler(initialLevel))
	mux.HandleFunc(buildversion.Get, buildversion.Handler(version))

	log.Info("Connecting to HostManager")
	t := rpc.NewTransport()
	hostmgrPeerChooser, err := peer.NewSmartChooser(
		cfg.Election,
		rootScope,
		common.HostManagerRole,
		t,
	)
	if err != nil {
		log.WithFields(
			log.Fields{
				"error": err,
				"role":  common.HostManagerRole},
		).Fatal("Could not create smart peer chooser for host manager")
	}
	defer hostmgrPeerChooser.Stop()

	hostmgrOutbound := t.NewOutbound(hostmgrPeerChooser)

	log.Info("Connecting to ResourceManager")
	resmgrPeerChooser, err := peer.NewSmartChooser(
		cfg.Election,
		rootScope,
		common.ResourceManagerRole,
		t,
	)
	if err != nil {
		log.WithFields(
			log.Fields{
				"error": err,
				"role":  common.ResourceManagerRole},
		).Fatal("Could not create smart peer chooser for resource manager")
	}
	defer resmgrPeerChooser.Stop()

	resmgrOutbound := t.NewOutbound(resmgrPeerChooser)

	log.Info("Setup the PlacementEngine server")
	// Now attempt to setup the dispatcher
	outbounds := yarpc.Outbounds{
		common.PelotonResourceManager: transport.Outbounds{
			Unary: resmgrOutbound,
		},
		common.PelotonHostManager: transport.Outbounds{
			Unary: hostmgrOutbound,
		},
	}

	securityManager, err := auth_impl.CreateNewSecurityManager(&cfg.Auth)
	if err != nil {
		log.WithError(err).
			Fatal("Could not enable security feature")
	}

	authInboundMiddleware := inbound.NewAuthInboundMiddleware(securityManager)

	securityClient, err := auth_impl.CreateNewSecurityClient(&cfg.Auth)
	if err != nil {
		log.WithError(err).
			Fatal("Could not establish secure inter-component communication")
	}

	authOutboundMiddleware := outbound.NewAuthOutboundMiddleware(securityClient)

	// Create both HTTP and GRPC inbounds
	inbounds := rpc.NewInbounds(
		cfg.Placement.HTTPPort,
		cfg.Placement.GRPCPort,
		mux,
	)

	log.Debug("Creating new YARPC dispatcher")
	dispatcher := yarpc.NewDispatcher(yarpc.Config{
		Name:      common.PelotonPlacement,
		Inbounds:  inbounds,
		Outbounds: outbounds,
		Metrics: yarpc.MetricsConfig{
			Tally: rootScope,
		},
		InboundMiddleware: yarpc.InboundMiddleware{
			Unary:  authInboundMiddleware,
			Oneway: authInboundMiddleware,
			Stream: authInboundMiddleware,
		},
		OutboundMiddleware: yarpc.OutboundMiddleware{
			Unary:  authOutboundMiddleware,
			Oneway: authOutboundMiddleware,
			Stream: authOutboundMiddleware,
		},
	})

	log.Debug("Starting YARPC dispatcher")
	if err := dispatcher.Start(); err != nil {
		log.Fatalf("Unable to start dispatcher: %v", err)
	}
	defer dispatcher.Stop()

	tallyMetrics := tally_metrics.NewMetrics(
		rootScope.SubScope("placement"))
	resourceManager := resmgrsvc.NewResourceManagerServiceYARPCClient(
		dispatcher.ClientConfig(common.PelotonResourceManager))
	hostManager := hostsvc.NewInternalHostServiceYARPCClient(
		dispatcher.ClientConfig(common.PelotonHostManager))

	var offerService offers.Service
	if cfg.Placement.HostManagerAPIVersion.IsV1() {
		hostManagerV1 := hostsvc_v1.NewHostManagerServiceYARPCClient(
			dispatcher.ClientConfig(common.PelotonHostManager))
		offerService = offers_v1.NewService(
			hostManagerV1,
			resourceManager,
			tallyMetrics,
		)
	} else {
		offerService = offers_v0.NewService(
			hostManager,
			resourceManager,
			tallyMetrics,
		)
	}
	taskService := tasks.NewService(
		resourceManager,
		&cfg.Placement,
		tallyMetrics,
	)
	hostsService := hosts.NewService(
		hostManager,
		resourceManager,
		tallyMetrics,
	)

	strategy := initPlacementStrategy(cfg)

	pool := async.NewPool(async.PoolOptions{
		MaxWorkers: cfg.Placement.Concurrency,
	}, nil)
	pool.Start()

	engine := placement.New(
		rootScope,
		&cfg.Placement,
		offerService,
		taskService,
		hostsService,
		strategy,
		pool,
	)
	log.Info("Start the PlacementEngine")
	engine.Start()
	defer engine.Stop()

	log.Info("Initialize the Heartbeat process")
	// we can *honestly* say the server is booted up now
	health.InitHeartbeat(rootScope, cfg.Health, nil)

	// start collecting runtime metrics
	defer metrics.StartCollectingRuntimeMetrics(
		rootScope,
		cfg.Metrics.RuntimeMetrics.Enabled,
		cfg.Metrics.RuntimeMetrics.CollectInterval)()

	select {}
}