in cmd/placement/main.go [185:470]
func main() {
app.Version(version)
app.HelpFlag.Short('h')
kingpin.MustParse(app.Parse(os.Args[1:]))
log.SetFormatter(
&logging.LogFieldFormatter{
Formatter: &log.JSONFormatter{},
Fields: log.Fields{
common.AppLogField: app.Name,
},
},
)
initialLevel := log.InfoLevel
if *debug {
initialLevel = log.DebugLevel
}
log.SetLevel(initialLevel)
log.WithField("files", *cfgFiles).
Info("Loading Placement Engnine config")
var cfg config.Config
if err := common_config.Parse(&cfg, *cfgFiles...); err != nil {
log.WithField("error", err).Fatal("Cannot parse yaml config")
}
if *enableSentry {
logging.ConfigureSentry(&cfg.SentryConfig)
}
// now, override any CLI flags in the loaded config.Config
if *zkPath != "" {
cfg.Mesos.ZkPath = *zkPath
}
if len(*electionZkServers) > 0 {
cfg.Election.ZKServers = *electionZkServers
}
if !*useCassandra {
cfg.Storage.UseCassandra = false
}
if *cassandraHosts != nil && len(*cassandraHosts) > 0 {
cfg.Storage.Cassandra.CassandraConn.ContactPoints = *cassandraHosts
}
if *cassandraStore != "" {
cfg.Storage.Cassandra.StoreName = *cassandraStore
}
if *httpPort != 0 {
cfg.Placement.HTTPPort = *httpPort
}
if *grpcPort != 0 {
cfg.Placement.GRPCPort = *grpcPort
}
if *datacenter != "" {
cfg.Storage.Cassandra.CassandraConn.DataCenter = *datacenter
}
if *cassandraPort != 0 {
cfg.Storage.Cassandra.CassandraConn.Port = *cassandraPort
}
if *taskType != "" {
overridePlacementStrategy(*taskType, &cfg)
}
if *taskDequeueLimit != 0 {
cfg.Placement.TaskDequeueLimit = *taskDequeueLimit
}
if *taskDequeuePeriod != 0 {
cfg.Placement.TaskDequeuePeriod = time.Duration(*taskDequeuePeriod) * time.Second
}
if *hostMgrAPIVersionStr != "" {
hostMgrAPIVersion, err := api.ParseVersion(*hostMgrAPIVersionStr)
if err != nil {
log.WithError(err).Fatal("Failed to parse hostmgr-api-version")
}
cfg.Placement.HostManagerAPIVersion = hostMgrAPIVersion
}
if cfg.Placement.HostManagerAPIVersion == "" {
cfg.Placement.HostManagerAPIVersion = api.V0
}
if *useHostPool {
log.Info("Use Host Pool for placement")
cfg.Placement.UseHostPool = true
}
// Parse and setup peloton auth
if len(*authType) != 0 {
cfg.Auth.AuthType = auth.Type(*authType)
cfg.Auth.Path = *authConfigFile
}
if cfg.Placement.HostManagerAPIVersion == "" {
cfg.Placement.HostManagerAPIVersion = api.V0
}
log.WithField("placement_task_type", cfg.Placement.TaskType).
WithField("strategy", cfg.Placement.Strategy).
Info("Placement engine type")
log.WithField("config", cfg).
Info("Completed Loading Placement Engine config")
rootScope, scopeCloser, mux := metrics.InitMetricScope(
&cfg.Metrics,
common.PelotonPlacement,
metrics.TallyFlushInterval,
)
defer scopeCloser.Close()
mux.HandleFunc(logging.LevelOverwrite, logging.LevelOverwriteHandler(initialLevel))
mux.HandleFunc(buildversion.Get, buildversion.Handler(version))
log.Info("Connecting to HostManager")
t := rpc.NewTransport()
hostmgrPeerChooser, err := peer.NewSmartChooser(
cfg.Election,
rootScope,
common.HostManagerRole,
t,
)
if err != nil {
log.WithFields(
log.Fields{
"error": err,
"role": common.HostManagerRole},
).Fatal("Could not create smart peer chooser for host manager")
}
defer hostmgrPeerChooser.Stop()
hostmgrOutbound := t.NewOutbound(hostmgrPeerChooser)
log.Info("Connecting to ResourceManager")
resmgrPeerChooser, err := peer.NewSmartChooser(
cfg.Election,
rootScope,
common.ResourceManagerRole,
t,
)
if err != nil {
log.WithFields(
log.Fields{
"error": err,
"role": common.ResourceManagerRole},
).Fatal("Could not create smart peer chooser for resource manager")
}
defer resmgrPeerChooser.Stop()
resmgrOutbound := t.NewOutbound(resmgrPeerChooser)
log.Info("Setup the PlacementEngine server")
// Now attempt to setup the dispatcher
outbounds := yarpc.Outbounds{
common.PelotonResourceManager: transport.Outbounds{
Unary: resmgrOutbound,
},
common.PelotonHostManager: transport.Outbounds{
Unary: hostmgrOutbound,
},
}
securityManager, err := auth_impl.CreateNewSecurityManager(&cfg.Auth)
if err != nil {
log.WithError(err).
Fatal("Could not enable security feature")
}
authInboundMiddleware := inbound.NewAuthInboundMiddleware(securityManager)
securityClient, err := auth_impl.CreateNewSecurityClient(&cfg.Auth)
if err != nil {
log.WithError(err).
Fatal("Could not establish secure inter-component communication")
}
authOutboundMiddleware := outbound.NewAuthOutboundMiddleware(securityClient)
// Create both HTTP and GRPC inbounds
inbounds := rpc.NewInbounds(
cfg.Placement.HTTPPort,
cfg.Placement.GRPCPort,
mux,
)
log.Debug("Creating new YARPC dispatcher")
dispatcher := yarpc.NewDispatcher(yarpc.Config{
Name: common.PelotonPlacement,
Inbounds: inbounds,
Outbounds: outbounds,
Metrics: yarpc.MetricsConfig{
Tally: rootScope,
},
InboundMiddleware: yarpc.InboundMiddleware{
Unary: authInboundMiddleware,
Oneway: authInboundMiddleware,
Stream: authInboundMiddleware,
},
OutboundMiddleware: yarpc.OutboundMiddleware{
Unary: authOutboundMiddleware,
Oneway: authOutboundMiddleware,
Stream: authOutboundMiddleware,
},
})
log.Debug("Starting YARPC dispatcher")
if err := dispatcher.Start(); err != nil {
log.Fatalf("Unable to start dispatcher: %v", err)
}
defer dispatcher.Stop()
tallyMetrics := tally_metrics.NewMetrics(
rootScope.SubScope("placement"))
resourceManager := resmgrsvc.NewResourceManagerServiceYARPCClient(
dispatcher.ClientConfig(common.PelotonResourceManager))
hostManager := hostsvc.NewInternalHostServiceYARPCClient(
dispatcher.ClientConfig(common.PelotonHostManager))
var offerService offers.Service
if cfg.Placement.HostManagerAPIVersion.IsV1() {
hostManagerV1 := hostsvc_v1.NewHostManagerServiceYARPCClient(
dispatcher.ClientConfig(common.PelotonHostManager))
offerService = offers_v1.NewService(
hostManagerV1,
resourceManager,
tallyMetrics,
)
} else {
offerService = offers_v0.NewService(
hostManager,
resourceManager,
tallyMetrics,
)
}
taskService := tasks.NewService(
resourceManager,
&cfg.Placement,
tallyMetrics,
)
hostsService := hosts.NewService(
hostManager,
resourceManager,
tallyMetrics,
)
strategy := initPlacementStrategy(cfg)
pool := async.NewPool(async.PoolOptions{
MaxWorkers: cfg.Placement.Concurrency,
}, nil)
pool.Start()
engine := placement.New(
rootScope,
&cfg.Placement,
offerService,
taskService,
hostsService,
strategy,
pool,
)
log.Info("Start the PlacementEngine")
engine.Start()
defer engine.Stop()
log.Info("Initialize the Heartbeat process")
// we can *honestly* say the server is booted up now
health.InitHeartbeat(rootScope, cfg.Health, nil)
// start collecting runtime metrics
defer metrics.StartCollectingRuntimeMetrics(
rootScope,
cfg.Metrics.RuntimeMetrics.Enabled,
cfg.Metrics.RuntimeMetrics.CollectInterval)()
select {}
}