cmd/resmgr/main.go (434 lines of code) (raw):

// Copyright (c) 2019 Uber Technologies, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package main import ( "os" "github.com/uber/peloton/.gen/peloton/private/hostmgr/hostsvc" pb_hostsvc "github.com/uber/peloton/.gen/peloton/api/v0/host/svc" "github.com/uber/peloton/pkg/auth" auth_impl "github.com/uber/peloton/pkg/auth/impl" "github.com/uber/peloton/pkg/common" "github.com/uber/peloton/pkg/common/api" "github.com/uber/peloton/pkg/common/buildversion" "github.com/uber/peloton/pkg/common/config" "github.com/uber/peloton/pkg/common/health" "github.com/uber/peloton/pkg/common/leader" "github.com/uber/peloton/pkg/common/logging" "github.com/uber/peloton/pkg/common/metrics" "github.com/uber/peloton/pkg/common/rpc" "github.com/uber/peloton/pkg/hostmgr/mesos/yarpc/peer" "github.com/uber/peloton/pkg/middleware/inbound" "github.com/uber/peloton/pkg/middleware/outbound" "github.com/uber/peloton/pkg/resmgr" "github.com/uber/peloton/pkg/resmgr/entitlement" maintenance "github.com/uber/peloton/pkg/resmgr/host" "github.com/uber/peloton/pkg/resmgr/hostmover" "github.com/uber/peloton/pkg/resmgr/preemption" "github.com/uber/peloton/pkg/resmgr/respool" "github.com/uber/peloton/pkg/resmgr/respool/respoolsvc" "github.com/uber/peloton/pkg/resmgr/task" "github.com/uber/peloton/pkg/storage/cassandra" ormobjects "github.com/uber/peloton/pkg/storage/objects" "github.com/uber/peloton/pkg/storage/stores" log "github.com/sirupsen/logrus" _ "go.uber.org/automaxprocs" "go.uber.org/yarpc" "go.uber.org/yarpc/api/transport" kingpin "gopkg.in/alecthomas/kingpin.v2" ) var ( version string app = kingpin.New("peloton-resmgr", "Peloton Resource Manager") debug = app.Flag( "debug", "enable debug mode (print full json responses)"). Short('d'). Default("false"). Envar("ENABLE_DEBUG_LOGGING"). Bool() enableSentry = app.Flag( "enable-sentry", "enable logging hook up to sentry"). Default("false"). Envar("ENABLE_SENTRY_LOGGING"). Bool() cfgFiles = app.Flag( "config", "YAML config files (can be provided multiple times to merge configs)"). Short('c'). Required(). ExistingFiles() electionZkServers = app.Flag( "election-zk-server", "Election Zookeeper servers. Specify multiple times for multiple servers "+ "(election.zk_servers override) (set $ELECTION_ZK_SERVERS to override)"). Envar("ELECTION_ZK_SERVERS"). Strings() httpPort = app.Flag( "http-port", "Resource manager HTTP port (resmgr.http_port override) "+ "(set $HTTP_PORT to override)"). Envar("HTTP_PORT"). Int() grpcPort = app.Flag( "grpc-port", "Resource manager GRPC port (resmgr.grpc_port override) "+ "(set $GRPC_PORT to override)"). Envar("GRPC_PORT"). Int() useCassandra = app.Flag( "use-cassandra", "Use cassandra storage implementation"). Default("true"). Envar("USE_CASSANDRA"). Bool() cassandraHosts = app.Flag( "cassandra-hosts", "Cassandra hosts"). Envar("CASSANDRA_HOSTS"). Strings() cassandraStore = app.Flag( "cassandra-store", "Cassandra store name"). Default(""). Envar("CASSANDRA_STORE"). String() cassandraPort = app.Flag( "cassandra-port", "Cassandra port to connect"). Default("0"). Envar("CASSANDRA_PORT"). Int() pelotonSecretFile = app.Flag( "peloton-secret-file", "Secret file containing all Peloton secrets"). Default(""). Envar("PELOTON_SECRET_FILE"). String() datacenter = app.Flag( "datacenter", "Datacenter name"). Default(""). Envar("DATACENTER"). String() enablePreemption = app.Flag( "enable_preemption", "Enabling preemption"). Default("false"). Envar("ENABLE_PREEMPTION"). Bool() taskPreemptionPeriod = app.Flag( "task_preemption_period", "Setting task preemption period"). Envar("TASK_PREEMPTION_PERIOD"). Duration() enableSLATracking = app.Flag( "enable_sla_tracking", "Enabling SLA tracking"). Default("false"). Envar("ENABLE_SLA_TRACKING"). Bool() authType = app.Flag( "auth-type", "Define the auth type used, default to NOOP"). Default("NOOP"). Envar("AUTH_TYPE"). Enum("NOOP", "BASIC") authConfigFile = app.Flag( "auth-config-file", "config file for the auth feature, which is specific to the auth type used"). Default(""). Envar("AUTH_CONFIG_FILE"). String() hostMgrAPIVersionStr = app.Flag( "hostmgr-api-version", "Define the API Version of host manager"). Default(""). Envar("HOSTMGR_API_VERSION"). String() useHostPool = app.Flag( "use_host_pool", "Using host pool"). Default("false"). Envar("USE_HOST_POOL"). Bool() ) func getConfig(cfgFiles ...string) Config { log.WithField("files", cfgFiles). Info("Loading Resource Manager config") var cfg Config if err := config.Parse(&cfg, cfgFiles...); err != nil { log.WithError(err).Fatal("Cannot parse yaml config") } if *enableSentry { logging.ConfigureSentry(&cfg.SentryConfig) } // now, override any CLI flags in the loaded config.Config if len(*electionZkServers) > 0 { cfg.Election.ZKServers = *electionZkServers } if *httpPort != 0 { cfg.ResManager.HTTPPort = *httpPort } if *grpcPort != 0 { cfg.ResManager.GRPCPort = *grpcPort } if !*useCassandra { cfg.Storage.UseCassandra = false } if *cassandraHosts != nil && len(*cassandraHosts) > 0 { cfg.Storage.Cassandra.CassandraConn.ContactPoints = *cassandraHosts } if *cassandraStore != "" { cfg.Storage.Cassandra.StoreName = *cassandraStore } if *cassandraPort != 0 { cfg.Storage.Cassandra.CassandraConn.Port = *cassandraPort } if *datacenter != "" { cfg.Storage.Cassandra.CassandraConn.DataCenter = *datacenter } // Parse and setup peloton secrets if *pelotonSecretFile != "" { var secretsCfg config.PelotonSecretsConfig if err := config.Parse(&secretsCfg, *pelotonSecretFile); err != nil { log.WithError(err). WithField("peloton_secret_file", *pelotonSecretFile). Fatal("Cannot parse secret config") } cfg.Storage.Cassandra.CassandraConn.Username = secretsCfg.CassandraUsername cfg.Storage.Cassandra.CassandraConn.Password = secretsCfg.CassandraPassword } if *enablePreemption { cfg.ResManager.PreemptionConfig.Enabled = *enablePreemption } if *taskPreemptionPeriod != 0 { cfg.ResManager.PreemptionConfig.TaskPreemptionPeriod = *taskPreemptionPeriod } if *enableSLATracking { cfg.ResManager.RmTaskConfig.EnableSLATracking = *enableSLATracking } if *hostMgrAPIVersionStr != "" { hostMgrAPIVersion, err := api.ParseVersion(*hostMgrAPIVersionStr) if err != nil { log.WithError(err).Fatal("Failed to parse hostmgr-api-version") } cfg.ResManager.HostManagerAPIVersion = hostMgrAPIVersion } if cfg.ResManager.HostManagerAPIVersion == "" { cfg.ResManager.HostManagerAPIVersion = api.V0 } // Parse and setup peloton auth if len(*authType) != 0 { cfg.Auth.AuthType = auth.Type(*authType) cfg.Auth.Path = *authConfigFile } if *useHostPool { cfg.ResManager.UseHostPool = *useHostPool } log. WithField("config", cfg). Info("Loaded Resource Manager config") return cfg } func main() { app.Version(version) app.HelpFlag.Short('h') kingpin.MustParse(app.Parse(os.Args[1:])) log.SetFormatter( &logging.LogFieldFormatter{ Formatter: &log.JSONFormatter{}, Fields: log.Fields{ common.AppLogField: app.Name, }, }, ) initialLevel := log.InfoLevel if *debug { initialLevel = log.DebugLevel } log.SetLevel(initialLevel) cfg := getConfig(*cfgFiles...) log.WithField("config", cfg). Info("Completed Resource Manager config") rootScope, scopeCloser, mux := metrics.InitMetricScope( &cfg.Metrics, common.PelotonResourceManager, metrics.TallyFlushInterval, ) defer scopeCloser.Close() rootScope.Counter("boot").Inc(1) mux.HandleFunc(logging.LevelOverwrite, logging.LevelOverwriteHandler(initialLevel)) mux.HandleFunc(buildversion.Get, buildversion.Handler(version)) store := stores.MustCreateStore(&cfg.Storage, rootScope) ormStore, ormErr := ormobjects.NewCassandraStore( cassandra.ToOrmConfig(&cfg.Storage.Cassandra), rootScope) if ormErr != nil { log.WithError(ormErr).Fatal("Failed to create ORM store for Cassandra") } respoolOps := ormobjects.NewResPoolOps(ormStore) activeJobsOps := ormobjects.NewActiveJobsOps(ormStore) // Create both HTTP and GRPC inbounds inbounds := rpc.NewInbounds( cfg.ResManager.HTTPPort, cfg.ResManager.GRPCPort, mux, ) // all leader discovery metrics share a scope (and will be tagged // with role={role}) discoveryScope := rootScope.SubScope("discovery") // setup the discovery service to detect hostmgr leaders and // configure the YARPC Peer dynamically t := rpc.NewTransport() hostmgrPeerChooser, err := peer.NewSmartChooser( cfg.Election, discoveryScope, common.HostManagerRole, t, ) if err != nil { log. WithError(err). WithField("role", common.HostManagerRole). Fatal("Could not create smart peer chooser") } defer hostmgrPeerChooser.Stop() hostmgrOutbound := t.NewOutbound(hostmgrPeerChooser) outbounds := yarpc.Outbounds{ common.PelotonHostManager: transport.Outbounds{ Unary: hostmgrOutbound, }, } securityManager, err := auth_impl.CreateNewSecurityManager(&cfg.Auth) if err != nil { log.WithError(err). Fatal("Could not enable security feature") } authInboundMiddleware := inbound.NewAuthInboundMiddleware(securityManager) yarpcMetricsMiddleware := &inbound.YAPRCMetricsInboundMiddleware{Scope: rootScope.SubScope("yarpc")} securityClient, err := auth_impl.CreateNewSecurityClient(&cfg.Auth) if err != nil { log.WithError(err). Fatal("Could not establish secure inter-component communication") } authOutboundMiddleware := outbound.NewAuthOutboundMiddleware(securityClient) leaderCheckMiddleware := &inbound.LeaderCheckInboundMiddleware{} dispatcher := yarpc.NewDispatcher(yarpc.Config{ Name: common.PelotonResourceManager, Inbounds: inbounds, Outbounds: outbounds, Metrics: yarpc.MetricsConfig{ Tally: rootScope, }, InboundMiddleware: yarpc.InboundMiddleware{ Unary: yarpc.UnaryInboundMiddleware(authInboundMiddleware, leaderCheckMiddleware, yarpcMetricsMiddleware), Oneway: yarpc.OnewayInboundMiddleware(authInboundMiddleware, leaderCheckMiddleware, yarpcMetricsMiddleware), Stream: yarpc.StreamInboundMiddleware(authInboundMiddleware, leaderCheckMiddleware, yarpcMetricsMiddleware), }, OutboundMiddleware: yarpc.OutboundMiddleware{ Unary: authOutboundMiddleware, Oneway: authOutboundMiddleware, Stream: authOutboundMiddleware, }, }) hostmgrClient := hostsvc.NewInternalHostServiceYARPCClient( dispatcher.ClientConfig( common.PelotonHostManager), ) hostServiceClient := pb_hostsvc.NewHostServiceYARPCClient( dispatcher.ClientConfig( common.PelotonHostManager), ) // Initializing Resource Pool Tree. tree := respool.NewTree( rootScope, respoolOps, store, // store implements JobStore store, // store implements TaskStore *cfg.ResManager.PreemptionConfig) // Initialize resource pool service handlers respoolsvc.InitServiceHandler( dispatcher, rootScope, tree, ormobjects.NewResPoolOps(ormStore), ) // Initializing the rmtasks in-memory tracker task.InitTaskTracker( rootScope, cfg.ResManager.RmTaskConfig, ) // Initializing the task scheduler task.InitScheduler( rootScope, tree, cfg.ResManager.TaskSchedulingPeriod, task.GetTracker(), ) // Initializing the entitlement calculator calculator := entitlement.NewCalculator( cfg.ResManager.EntitlementCaculationPeriod, rootScope, dispatcher, tree, cfg.ResManager.HostManagerAPIVersion, cfg.ResManager.UseHostPool, ) // Initializing the task reconciler reconciler := task.NewReconciler( task.GetTracker(), store, // store implements TaskStore rootScope, cfg.ResManager.TaskReconciliationPeriod, ) // Initializing the task preemptor preemptor := preemption.NewPreemptor( rootScope, cfg.ResManager.PreemptionConfig, task.GetTracker(), tree, ) // Initializing the host drainer drainer := maintenance.NewDrainer( rootScope, hostmgrClient, cfg.ResManager.HostDrainerPeriod, task.GetTracker(), preemptor) // Initializing the batch scorer batchScorer := hostmover.NewBatchScorer( cfg.ResManager.EnableHostScorer, hostServiceClient) // Initialize resource manager service handlers serviceHandler := resmgr.NewServiceHandler( dispatcher, rootScope, task.GetTracker(), batchScorer, tree, preemptor, hostmgrClient, cfg.ResManager, ) // Initialize recovery recoveryHandler := resmgr.NewRecovery( rootScope, store, // store implements TaskStore activeJobsOps, ormobjects.NewJobConfigOps(ormStore), ormobjects.NewJobRuntimeOps(ormStore), serviceHandler, tree, cfg.ResManager, hostmgrClient, ) // Initialize the server server := resmgr.NewServer(rootScope, cfg.ResManager.HTTPPort, cfg.ResManager.GRPCPort, tree, recoveryHandler, calculator, reconciler, preemptor, drainer, batchScorer, ) // Set nomination for leader check middleware leaderCheckMiddleware.SetNomination(server) candidate, err := leader.NewCandidate( cfg.Election, rootScope, common.ResourceManagerRole, server, ) if err != nil { log.Fatalf("Unable to create leader candidate: %v", err) } if err = candidate.Start(); err != nil { log.Fatalf("Unable to start leader candidate: %v", err) } defer candidate.Stop() // Start dispatch loop if err := dispatcher.Start(); err != nil { log.Fatalf("Unable to start rpc server: %v", err) } defer dispatcher.Stop() log.WithFields(log.Fields{ "http_port": cfg.ResManager.HTTPPort, "grpc_port": cfg.ResManager.GRPCPort, }).Info("Started resource manager") // we can *honestly* say the server is booted up now health.InitHeartbeat(rootScope, cfg.Health, candidate) // start collecting runtime metrics defer metrics.StartCollectingRuntimeMetrics( rootScope, cfg.Metrics.RuntimeMetrics.Enabled, cfg.Metrics.RuntimeMetrics.CollectInterval)() select {} }