cmd/hostmgr/main.go (679 lines of code) (raw):

// Copyright (c) 2019 Uber Technologies, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package main import ( "net/url" "os" "strings" "time" "github.com/uber/peloton/.gen/peloton/private/resmgrsvc" cqos "github.com/uber/peloton/.gen/qos/v1alpha1" "github.com/uber/peloton/pkg/auth" auth_impl "github.com/uber/peloton/pkg/auth/impl" "github.com/uber/peloton/pkg/common" "github.com/uber/peloton/pkg/common/background" "github.com/uber/peloton/pkg/common/backoff" "github.com/uber/peloton/pkg/common/buildversion" "github.com/uber/peloton/pkg/common/config" "github.com/uber/peloton/pkg/common/health" "github.com/uber/peloton/pkg/common/leader" "github.com/uber/peloton/pkg/common/logging" "github.com/uber/peloton/pkg/common/metrics" "github.com/uber/peloton/pkg/common/rpc" "github.com/uber/peloton/pkg/hostmgr" bin_packing "github.com/uber/peloton/pkg/hostmgr/binpacking" "github.com/uber/peloton/pkg/hostmgr/goalstate" "github.com/uber/peloton/pkg/hostmgr/host" "github.com/uber/peloton/pkg/hostmgr/host/drainer" "github.com/uber/peloton/pkg/hostmgr/hostpool/hostmover" "github.com/uber/peloton/pkg/hostmgr/hostpool/manager" "github.com/uber/peloton/pkg/hostmgr/hostsvc" "github.com/uber/peloton/pkg/hostmgr/mesos" "github.com/uber/peloton/pkg/hostmgr/mesos/yarpc/encoding/mpb" "github.com/uber/peloton/pkg/hostmgr/mesos/yarpc/peer" "github.com/uber/peloton/pkg/hostmgr/mesos/yarpc/transport/mhttp" hostmetric "github.com/uber/peloton/pkg/hostmgr/metrics" "github.com/uber/peloton/pkg/hostmgr/offer" "github.com/uber/peloton/pkg/hostmgr/p2k/hostcache" "github.com/uber/peloton/pkg/hostmgr/p2k/hostmgrsvc" "github.com/uber/peloton/pkg/hostmgr/p2k/plugins" mesosplugins "github.com/uber/peloton/pkg/hostmgr/p2k/plugins/mesos" "github.com/uber/peloton/pkg/hostmgr/p2k/podeventmanager" "github.com/uber/peloton/pkg/hostmgr/p2k/scalar" "github.com/uber/peloton/pkg/hostmgr/queue" "github.com/uber/peloton/pkg/hostmgr/reconcile" "github.com/uber/peloton/pkg/hostmgr/watchevent" "github.com/uber/peloton/pkg/middleware/inbound" "github.com/uber/peloton/pkg/middleware/outbound" "github.com/uber/peloton/pkg/storage/cassandra" ormobjects "github.com/uber/peloton/pkg/storage/objects" "github.com/uber/peloton/pkg/storage/stores" log "github.com/sirupsen/logrus" _ "go.uber.org/automaxprocs" "go.uber.org/yarpc" "go.uber.org/yarpc/api/transport" "go.uber.org/yarpc/transport/grpc" kingpin "gopkg.in/alecthomas/kingpin.v2" ) var ( version string app = kingpin.New("peloton-hostmgr", "Peloton Host Manager") debug = app.Flag( "debug", "enable debug mode (print full json responses)"). Short('d'). Default("false"). Envar("ENABLE_DEBUG_LOGGING"). Bool() enableSentry = app.Flag( "enable-sentry", "enable logging hook up to sentry"). Default("false"). Envar("ENABLE_SENTRY_LOGGING"). Bool() configFiles = app.Flag( "config", "YAML config files (can be provided multiple times to merge configs)"). Short('c'). Required(). ExistingFiles() dbHost = app.Flag( "db-host", "Database host (db.host override) (set $DB_HOST to override)"). Envar("DB_HOST"). String() electionZkServers = app.Flag( "election-zk-server", "Election Zookeeper servers. Specify multiple times for multiple servers "+ "(election.zk_servers override) (set $ELECTION_ZK_SERVERS to override)"). Envar("ELECTION_ZK_SERVERS"). Strings() httpPort = app.Flag( "http-port", "Host manager HTTP port (hostmgr.http_port override) "+ "(set $HTTP_PORT to override)"). Envar("HTTP_PORT"). Int() grpcPort = app.Flag( "grpc-port", "Host manager GRPC port (hostmgr.grpc_port override) "+ "(set $GRPC_PORT to override)"). Envar("GRPC_PORT"). Int() zkPath = app.Flag( "zk-path", "Zookeeper path (mesos.zk_host override) (set $MESOS_ZK_PATH to override)"). Envar("MESOS_ZK_PATH"). String() useCassandra = app.Flag( "use-cassandra", "Use cassandra storage implementation"). Default("true"). Envar("USE_CASSANDRA"). Bool() cassandraHosts = app.Flag( "cassandra-hosts", "Cassandra hosts"). Envar("CASSANDRA_HOSTS"). Strings() cassandraStore = app.Flag( "cassandra-store", "Cassandra store name"). Default(""). Envar("CASSANDRA_STORE"). String() cassandraPort = app.Flag( "cassandra-port", "Cassandra port to connect"). Default("0"). Envar("CASSANDRA_PORT"). Int() autoMigrate = app.Flag( "auto-migrate", "Automatically update storage schemas."). Default("false"). Envar("AUTO_MIGRATE"). Bool() datacenter = app.Flag( "datacenter", "Datacenter name"). Default(""). Envar("DATACENTER"). String() mesosSecretFile = app.Flag( "mesos-secret-file", "Secret file containing one-liner password to connect to Mesos master"). Default(""). Envar("MESOS_SECRET_FILE"). String() pelotonSecretFile = app.Flag( "peloton-secret-file", "Secret file containing all Peloton secrets"). Default(""). Envar("PELOTON_SECRET_FILE"). String() scarceResourceTypes = app.Flag( "scarce-resource-type", "Scarce Resource Type."). Envar("SCARCE_RESOURCE_TYPES"). String() slackResourceTypes = app.Flag( "slack-resource-type", "Slack Resource Type."). Envar("SLACK_RESOURCE_TYPES"). String() enableRevocableResources = app.Flag( "enable-revocable-resources", "Revcocable Resources Enabled"). Envar("ENABLE_REVOCABLE_RESOURCES"). Bool() binPacking = app.Flag( "bin_packing", "Bin Packing enable/disable, by default disabled."). Envar("BIN_PACKING"). String() qoSAdvisorServiceAddress = app.Flag( "qos_advisor_service_address", "Qos advisor service address."). Default(""). Envar("QOS_ADVISOR_SERVICE_ADDRESS"). String() authType = app.Flag( "auth-type", "Define the auth type used, default to NOOP"). Default("NOOP"). Envar("AUTH_TYPE"). Enum("NOOP", "BASIC") authConfigFile = app.Flag( "auth-config-file", "config file for the auth feature, which is specific to the auth type used"). Default(""). Envar("AUTH_CONFIG_FILE"). String() kubeConfigFile = app.Flag( "kube-config-file", "YAML config file for kubernetes plugin"). Default(""). Envar("KUBECONFIG"). String() enableK8s = app.Flag( "enable-k8s", "Enable k8s plugin"). Envar("ENABLE_K8S"). Bool() enableHostPool = app.Flag( "enable-host-pool", "Enable Host Pool Management"). Envar("ENABLE_HOST_POOL"). Bool() ) const ( taskEvictionQueueName = "task-eviction-queue" ) func main() { app.Version(version) app.HelpFlag.Short('h') kingpin.MustParse(app.Parse(os.Args[1:])) log.SetFormatter( &logging.LogFieldFormatter{ Formatter: &logging.SecretsFormatter{Formatter: &log.JSONFormatter{}}, Fields: log.Fields{ common.AppLogField: app.Name, }, }, ) initialLevel := log.InfoLevel if *debug { initialLevel = log.DebugLevel } log.SetLevel(initialLevel) log.WithField("files", *configFiles).Info("Loading host manager config") var cfg Config if err := config.Parse(&cfg, *configFiles...); err != nil { log.WithField("error", err).Fatal("Cannot parse yaml config") } if *enableSentry { logging.ConfigureSentry(&cfg.SentryConfig) } // now, override any CLI flags in the loaded config.Config if *httpPort != 0 { cfg.HostManager.HTTPPort = *httpPort } if *grpcPort != 0 { cfg.HostManager.GRPCPort = *grpcPort } if *zkPath != "" { cfg.Mesos.ZkPath = *zkPath } if len(*electionZkServers) > 0 { cfg.Election.ZKServers = *electionZkServers } if !*useCassandra { cfg.Storage.UseCassandra = false } if *cassandraHosts != nil && len(*cassandraHosts) > 0 { cfg.Storage.Cassandra.CassandraConn.ContactPoints = *cassandraHosts } // Parse and setup peloton auth if len(*authType) != 0 { cfg.Auth.AuthType = auth.Type(*authType) cfg.Auth.Path = *authConfigFile } // Parse and setup peloton secrets if *pelotonSecretFile != "" { var secretsCfg config.PelotonSecretsConfig if err := config.Parse(&secretsCfg, *pelotonSecretFile); err != nil { log.WithError(err). WithField("peloton_secret_file", *pelotonSecretFile). Fatal("Cannot parse secret config") } cfg.Storage.Cassandra.CassandraConn.Username = secretsCfg.CassandraUsername cfg.Storage.Cassandra.CassandraConn.Password = secretsCfg.CassandraPassword } if *cassandraStore != "" { cfg.Storage.Cassandra.StoreName = *cassandraStore } if *cassandraPort != 0 { cfg.Storage.Cassandra.CassandraConn.Port = *cassandraPort } if *autoMigrate { cfg.Storage.AutoMigrate = *autoMigrate } if *datacenter != "" { cfg.Storage.Cassandra.CassandraConn.DataCenter = *datacenter } if *scarceResourceTypes != "" { log.Info(strings.Split(*scarceResourceTypes, ",")) cfg.HostManager.ScarceResourceTypes = strings.Split(*scarceResourceTypes, ",") } if *slackResourceTypes != "" { log.Info(strings.Split(*slackResourceTypes, ",")) cfg.HostManager.SlackResourceTypes = strings.Split(*slackResourceTypes, ",") } if *enableRevocableResources { log.Info("Revocable Resource Enabled") cfg.Mesos.Framework.RevocableResourcesSupported = *enableRevocableResources } if *binPacking != "" { log.Info("Bin Packing is enabled") cfg.HostManager.BinPacking = *binPacking } if *enableK8s { cfg.K8s.Enabled = true } if *kubeConfigFile != "" { cfg.K8s.Kubeconfig = *kubeConfigFile } if *qoSAdvisorServiceAddress != "" { cfg.HostManager.QoSAdvisorService.Address = *qoSAdvisorServiceAddress } if *enableHostPool { log.Info("Host Pool Management Enabled") cfg.HostManager.EnableHostPool = *enableHostPool } log.WithField("config", cfg).Info("Loaded Host Manager configuration") rootScope, scopeCloser, mux := metrics.InitMetricScope( &cfg.Metrics, common.PelotonHostManager, metrics.TallyFlushInterval, ) defer scopeCloser.Close() mux.HandleFunc( logging.LevelOverwrite, logging.LevelOverwriteHandler(initialLevel)) mux.HandleFunc(buildversion.Get, buildversion.Handler(version)) // Create both HTTP and GRPC inbounds inbounds := rpc.NewInbounds( cfg.HostManager.HTTPPort, cfg.HostManager.GRPCPort, mux, ) // TODO: Skip it when k8s is enabled. mesosMasterDetector, err := mesos.NewZKDetector(cfg.Mesos.ZkPath) if err != nil { log.Fatalf("Failed to initialize mesos master detector: %v", err) } // NOTE: we start the server immediately even if no leader has been // detected yet. rootScope.Counter("boot").Inc(1) store := stores.MustCreateStore(&cfg.Storage, rootScope) ormStore, ormErr := ormobjects.NewCassandraStore( cassandra.ToOrmConfig(&cfg.Storage.Cassandra), rootScope) if ormErr != nil { log.WithError(ormErr).Fatal("Failed to create ORM store for Cassandra") } activeJobsOps := ormobjects.NewActiveJobsOps(ormStore) authHeader, err := mesos.GetAuthHeader(&cfg.Mesos, *mesosSecretFile) if err != nil { log.WithError(err).Fatal("Cannot initialize auth header") } // Initialize YARPC dispatcher with necessary inbounds and outbounds driver := mesos.InitSchedulerDriver( &cfg.Mesos, store, // store implements FrameworkInfoStore authHeader, ) // Active host manager needs a Mesos inbound var mInbound = mhttp.NewInbound(rootScope, driver) inbounds = append(inbounds, mInbound) // TODO: update Mesos url when leading mesos master changes mOutbound := mhttp.NewOutbound( rootScope, mesosMasterDetector, driver.Endpoint(), authHeader, mhttp.MaxConnectionsPerHost(cfg.Mesos.Framework.MaxConnectionsToMesosMaster), ) // MasterOperatorClient API outbound mOperatorOutbound := mhttp.NewOutbound( rootScope, mesosMasterDetector, url.URL{ Scheme: "http", Path: common.MesosMasterOperatorEndPoint, }, authHeader, mhttp.MaxConnectionsPerHost(cfg.Mesos.Framework.MaxConnectionsToMesosMaster), ) // All leader discovery metrics share a scope (and will be tagged // with role={role}) discoveryScope := rootScope.SubScope("discovery") // TODO: Delete the outbounds from hostmgr to resmgr after switch // eventstream from push to pull (T1014913) // Setup the discovery service to detect resmgr leaders and // configure the YARPC Peer dynamically t := rpc.NewTransport() resmgrPeerChooser, err := peer.NewSmartChooser( cfg.Election, discoveryScope, common.ResourceManagerRole, t, ) if err != nil { log.WithFields(log.Fields{"error": err, "role": common.ResourceManagerRole}). Fatal("Could not create smart peer chooser") } defer resmgrPeerChooser.Stop() resmgrOutbound := t.NewOutbound(resmgrPeerChooser) outbounds := yarpc.Outbounds{ common.MesosMasterScheduler: mOutbound, common.MesosMasterOperator: mOperatorOutbound, common.PelotonResourceManager: transport.Outbounds{ Unary: resmgrOutbound, }, } if cfg.HostManager.QoSAdvisorService.Address != "" { var cqosOutbound *grpc.Outbound // Setup the discovery service to detect QoSAdvisorService cqosTransport := grpc.NewTransport() cqosOutbound = cqosTransport.NewSingleOutbound(cfg.HostManager. QoSAdvisorService.Address) outbounds[common.QoSAdvisorService] = transport.Outbounds{ Unary: cqosOutbound, } } securityManager, err := auth_impl.CreateNewSecurityManager(&cfg.Auth) if err != nil { log.WithError(err). Fatal("Could not enable security feature") } authInboundMiddleware := inbound.NewAuthInboundMiddleware(securityManager) securityClient, err := auth_impl.CreateNewSecurityClient(&cfg.Auth) if err != nil { log.WithError(err). Fatal("Could not establish secure inter-component communication") } authOutboundMiddleware := outbound.NewAuthOutboundMiddleware(securityClient) leaderCheckMiddleware := &inbound.LeaderCheckInboundMiddleware{} dispatcher := yarpc.NewDispatcher(yarpc.Config{ Name: common.PelotonHostManager, Inbounds: inbounds, Outbounds: outbounds, Metrics: yarpc.MetricsConfig{ Tally: rootScope, }, InboundMiddleware: yarpc.InboundMiddleware{ Unary: yarpc.UnaryInboundMiddleware(authInboundMiddleware, leaderCheckMiddleware), Oneway: yarpc.OnewayInboundMiddleware(authInboundMiddleware, leaderCheckMiddleware), Stream: yarpc.StreamInboundMiddleware(authInboundMiddleware, leaderCheckMiddleware), }, OutboundMiddleware: yarpc.OutboundMiddleware{ Unary: authOutboundMiddleware, Oneway: authOutboundMiddleware, Stream: authOutboundMiddleware, }, }) // Init the managers driven by the mesos callbacks. // They are driven by the leader who will subscribe to // Mesos callbacks // NOTE: This blocks us to move all Mesos related logic into // hostmgr.Server because schedulerClient uses dispatcher... schedulerClient := mpb.NewSchedulerClient( dispatcher.ClientConfig(common.MesosMasterScheduler), cfg.Mesos.Encoding, ) masterOperatorClient := mpb.NewMasterOperatorClient( dispatcher.ClientConfig(common.MesosMasterOperator), cfg.Mesos.Encoding, ) mesos.InitManager( dispatcher, &cfg.Mesos, store, // store implements FrameworkInfoStore ) var cQosClient cqos.QoSAdvisorServiceYARPCClient if cfg.HostManager.QoSAdvisorService.Address != "" { cQosClient = cqos.NewQoSAdvisorServiceYARPCClient( dispatcher.ClientConfig(common.QoSAdvisorService)) } log.WithFields(log.Fields{ "http_port": cfg.HostManager.HTTPPort, "url_path": common.PelotonEndpointPath, }).Info("HostService initialized") // Declare background works reconciler := reconcile.NewTaskReconciler( schedulerClient, rootScope, driver, activeJobsOps, store, // store implements TaskStore cfg.HostManager.TaskReconcilerConfig, ) ormobjects.InitHostInfoOps(ormStore) loader := host.Loader{ OperatorClient: masterOperatorClient, Scope: rootScope.SubScope("hostmap"), SlackResourceTypes: cfg.HostManager.SlackResourceTypes, HostInfoOps: ormobjects.GetHostInfoOps(), } backgroundManager := background.NewManager() // Retry on hostmap loader with Background Manager. err = backoff.Retry( func() error { return backgroundManager.RegisterWorks( background.Work{ Name: "hostmap", Func: loader.Load, Period: cfg.HostManager.HostmapRefreshInterval, }, ) }, backoff.NewRetryPolicy(cfg.HostManager.HostMgrBackoffRetryCount, time.Duration(cfg.HostManager.HostMgrBackoffRetryIntervalSec)*time.Second), func(error) bool { return true }) if err != nil { log.WithError(err).Fatal("Cannot register hostmap loader background worker.") } // Retry on reconciler registry with Background Manager. err = backoff.Retry( func() error { return backgroundManager.RegisterWorks( background.Work{ Name: "reconciler", Func: reconciler.Reconcile, Period: time.Duration( cfg.HostManager.TaskReconcilerConfig.ReconcileIntervalSec) * time.Second, InitialDelay: time.Duration( cfg.HostManager.TaskReconcilerConfig.InitialReconcileDelaySec) * time.Second, }, ) }, backoff.NewRetryPolicy(cfg.HostManager.HostMgrBackoffRetryCount, time.Duration(cfg.HostManager.HostMgrBackoffRetryIntervalSec)*time.Second), func(error) bool { return true }) if err != nil { log.WithError(err).Fatal("Cannot register reconciler background worker.") } metric := hostmetric.NewMetrics(rootScope) if cfg.HostManager.QoSAdvisorService.Address != "" { bin_packing.Init(cQosClient, metric) } else { bin_packing.Init(nil, nil) } log.WithField("ranker_name", cfg.HostManager.BinPacking). Info("Bin packing is enabled") defaultRanker := bin_packing.GetRankerByName(cfg.HostManager.BinPacking) if defaultRanker == nil { log.WithField("ranker_name", cfg.HostManager.BinPacking). Fatal("Ranker not found") } watchevent.InitWatchProcessor(cfg.HostManager.Watch, metric) watchProcessor := watchevent.GetWatchProcessor() plugin := plugins.NewNoopPlugin() var hostCache hostcache.HostCache podEventCh := make(chan *scalar.PodEvent, plugins.EventChanSize) hostEventCh := make(chan *scalar.HostEvent, plugins.EventChanSize) // If k8s is enabled, return a k8s plugin. // TODO: start MesosPlugin after it's implemented. if cfg.K8s.Enabled { var err error plugin, err = plugins.NewK8sPlugin( cfg.K8s.Kubeconfig, podEventCh, hostEventCh, ) if err != nil { log.WithError(err).Fatal("Cannot init host manager plugin.") } } // a temporary measure to enable mesos plugins for some usecases, // it is not fully ready to be used for v1alpha handler. mesosPlugin := mesosplugins.NewMesosManager( dispatcher, driver, schedulerClient, masterOperatorClient, cfg.HostManager.HostmapRefreshInterval, time.Duration(cfg.HostManager.OfferHoldTimeSec)*time.Second, rootScope, podEventCh, hostEventCh, ) // Initialize offer pool event handler with nil host pool manager. // TODO: Refactor event stream handler and move it out of offer package // to avoid circular dependency, since now offer pool event handler requires // host pool manager, and host pool manager requires event stream handler // which is part of offer pool event handler. offer.InitEventHandler( dispatcher, rootScope, schedulerClient, resmgrsvc.NewResourceManagerServiceYARPCClient( dispatcher.ClientConfig(common.PelotonResourceManager)), backgroundManager, defaultRanker, cfg.HostManager, watchProcessor, nil, mesosPlugin, ) // Construct host pool manager if it is enabled. var hostPoolManager manager.HostPoolManager if cfg.HostManager.EnableHostPool { hostPoolManager = manager.New( cfg.HostManager.HostPoolReconcileInterval, offer.GetEventHandler().GetEventStreamHandler(), ormobjects.GetHostInfoOps(), rootScope, ) // Set host pool manager in offer pool event handler. offer.GetEventHandler().SetHostPoolManager(hostPoolManager) } // Create host cache instance. hostCache = hostcache.New( hostEventCh, backgroundManager, rootScope, ) pem := podeventmanager.New( dispatcher, podEventCh, plugin, hostCache, cfg.HostManager.TaskUpdateBufferSize, rootScope, cfg.K8s.Enabled) // Create v1alpha hostmgr internal service handler. hostmgrsvc.NewServiceHandler( dispatcher, rootScope, plugin, hostCache, pem, ) // Create Goal State Engine driver goalStateDriver := goalstate.NewDriver( ormobjects.GetHostInfoOps(), masterOperatorClient, rootScope, cfg.HostManager.GoalState, hostPoolManager, ) // Create Host mover object hostMover := hostmover.NewHostMover( hostPoolManager, ormobjects.GetHostInfoOps(), goalStateDriver, rootScope, resmgrsvc.NewResourceManagerServiceYARPCClient( dispatcher.ClientConfig(common.PelotonResourceManager)), ) taskEvictionQueue := queue.NewTaskQueue(taskEvictionQueueName) // Create new hostmgr internal service handler. serviceHandler := hostmgr.NewServiceHandler( dispatcher, metric, schedulerClient, masterOperatorClient, driver, cfg.Mesos, mesosMasterDetector, &cfg.HostManager, cfg.HostManager.SlackResourceTypes, watchProcessor, hostPoolManager, goalStateDriver, ormobjects.GetHostInfoOps(), hostCache, mesosPlugin, ) hostDrainer := drainer.NewDrainer( cfg.HostManager.HostDrainerPeriod, cfg.Mesos.Framework.Role, masterOperatorClient, goalStateDriver, ormobjects.GetHostInfoOps(), taskEvictionQueue, ) hostsvc.InitServiceHandler( dispatcher, rootScope, hostDrainer, hostPoolManager, hostMover, ) recoveryHandler := hostmgr.NewRecoveryHandler( rootScope, store, ormStore, hostCache, ) server := hostmgr.NewServer( rootScope, backgroundManager, cfg.HostManager.HTTPPort, cfg.HostManager.GRPCPort, mesosMasterDetector, mInbound, mOutbound, reconciler, recoveryHandler, hostDrainer, serviceHandler.GetReserver(), watchProcessor, plugin, hostCache, mesosPlugin, hostPoolManager, ) server.Start() // Start dispatch loop. if err := dispatcher.Start(); err != nil { log.Fatalf("Could not start rpc server: %v", err) } // Set nomination for leader check middleware leaderCheckMiddleware.SetNomination(server) candidate, err := leader.NewCandidate( cfg.Election, rootScope, common.HostManagerRole, server, ) if err != nil { log.Fatalf("Unable to create leader candidate: %v", err) } err = candidate.Start() if err != nil { log.Fatalf("Unable to start leader candidate: %v", err) } defer candidate.Stop() // Start dispatch loop. if err := dispatcher.Start(); err != nil { log.Fatalf("Could not start rpc server: %v", err) } defer dispatcher.Stop() log.WithFields(log.Fields{ "httpPort": cfg.HostManager.HTTPPort, "grpcPort": cfg.HostManager.GRPCPort, }).Info("Started host manager") // We can *honestly* say the server is booted up now. health.InitHeartbeat(rootScope, cfg.Health, candidate) // Start collecting runtime metrics. defer metrics.StartCollectingRuntimeMetrics( rootScope, cfg.Metrics.RuntimeMetrics.Enabled, cfg.Metrics.RuntimeMetrics.CollectInterval)() select {} }