cmd/placement/main.go (409 lines of code) (raw):
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"os"
"time"
"github.com/uber/peloton/pkg/auth"
auth_impl "github.com/uber/peloton/pkg/auth/impl"
"github.com/uber/peloton/pkg/common"
"github.com/uber/peloton/pkg/common/api"
"github.com/uber/peloton/pkg/common/async"
"github.com/uber/peloton/pkg/common/buildversion"
common_config "github.com/uber/peloton/pkg/common/config"
"github.com/uber/peloton/pkg/common/health"
"github.com/uber/peloton/pkg/common/logging"
"github.com/uber/peloton/pkg/common/metrics"
"github.com/uber/peloton/pkg/common/rpc"
"github.com/uber/peloton/pkg/hostmgr/mesos/yarpc/peer"
"github.com/uber/peloton/pkg/middleware/inbound"
"github.com/uber/peloton/pkg/middleware/outbound"
"github.com/uber/peloton/pkg/placement"
"github.com/uber/peloton/pkg/placement/config"
"github.com/uber/peloton/pkg/placement/hosts"
tally_metrics "github.com/uber/peloton/pkg/placement/metrics"
"github.com/uber/peloton/pkg/placement/offers"
offers_v0 "github.com/uber/peloton/pkg/placement/offers/v0"
offers_v1 "github.com/uber/peloton/pkg/placement/offers/v1"
"github.com/uber/peloton/pkg/placement/plugins"
"github.com/uber/peloton/pkg/placement/plugins/batch"
mimir_strategy "github.com/uber/peloton/pkg/placement/plugins/mimir"
"github.com/uber/peloton/pkg/placement/plugins/mimir/lib/algorithms"
"github.com/uber/peloton/pkg/placement/tasks"
"github.com/uber/peloton/.gen/peloton/private/hostmgr/hostsvc"
hostsvc_v1 "github.com/uber/peloton/.gen/peloton/private/hostmgr/v1alpha/svc"
"github.com/uber/peloton/.gen/peloton/private/resmgr"
"github.com/uber/peloton/.gen/peloton/private/resmgrsvc"
log "github.com/sirupsen/logrus"
_ "go.uber.org/automaxprocs"
"go.uber.org/yarpc"
"go.uber.org/yarpc/api/transport"
"gopkg.in/alecthomas/kingpin.v2"
)
var (
version string
app = kingpin.New("peloton-placement", "Peloton Placement Engine")
debug = app.Flag(
"debug", "enable debug mode (print full json responses)").
Short('d').
Default("false").
Envar("ENABLE_DEBUG_LOGGING").
Bool()
enableSentry = app.Flag(
"enable-sentry", "enable logging hook up to sentry").
Default("false").
Envar("ENABLE_SENTRY_LOGGING").
Bool()
cfgFiles = app.Flag(
"config",
"YAML config files (can be provided multiple times to merge configs)").
Short('c').
Required().
ExistingFiles()
zkPath = app.Flag(
"zk-path",
"Zookeeper path (mesos.zk_host override) (set $MESOS_ZK_PATH to override)").
Envar("MESOS_ZK_PATH").
String()
electionZkServers = app.Flag(
"election-zk-server",
"Election Zookeeper servers. Specify multiple times for multiple servers "+
"(election.zk_servers override) (set $ELECTION_ZK_SERVERS to override)").
Envar("ELECTION_ZK_SERVERS").
Strings()
useCassandra = app.Flag(
"use-cassandra", "Use cassandra storage implementation").
Default("true").
Envar("USE_CASSANDRA").
Bool()
cassandraHosts = app.Flag(
"cassandra-hosts", "Cassandra hosts").
Envar("CASSANDRA_HOSTS").
Strings()
cassandraStore = app.Flag(
"cassandra-store", "Cassandra store name").
Default("").
Envar("CASSANDRA_STORE").
String()
cassandraPort = app.Flag(
"cassandra-port", "Cassandra port to connect").
Default("0").
Envar("CASSANDRA_PORT").
Int()
httpPort = app.Flag(
"http-port",
"Placement engine HTTP port (placement.http_port override) "+
"(set $HTTP_PORT to override)").
Envar("HTTP_PORT").
Int()
grpcPort = app.Flag(
"grpc-port",
"Placement engine GRPC port (placement.grpc_port override) "+
"(set $GRPC_PORT to override)").
Envar("GRPC_PORT").
Int()
datacenter = app.Flag(
"datacenter", "Datacenter name").
Default("").
Envar("DATACENTER").
String()
taskType = app.Flag(
"task-type", "Placement engine task type").
Default("BATCH").
Envar("TASK_TYPE").
String()
taskDequeueLimit = app.Flag(
"task-dequeue-limit", "Number of tasks to dequeue").
Envar("TASK_DEQUEUE_LIMIT").
Int()
taskDequeuePeriod = app.Flag(
"task-dequeue-period", "Period at which tasks are dequeued to be placed in seconds").
Envar("TASK_DEQUEUE_PERIOD").
Default("0").
Int()
authType = app.Flag(
"auth-type",
"Define the auth type used, default to NOOP").
Default("NOOP").
Envar("AUTH_TYPE").
Enum("NOOP", "BASIC")
authConfigFile = app.Flag(
"auth-config-file",
"config file for the auth feature, which is specific to the auth type used").
Default("").
Envar("AUTH_CONFIG_FILE").
String()
hostMgrAPIVersionStr = app.Flag(
"hostmgr-api-version",
"Define the API Version of host manager").
Default("").
Envar("HOSTMGR_API_VERSION").
String()
useHostPool = app.Flag(
"use-host-pool", "Use host pool").
Default("false").
Envar("USE_HOST_POOL").
Bool()
)
func main() {
app.Version(version)
app.HelpFlag.Short('h')
kingpin.MustParse(app.Parse(os.Args[1:]))
log.SetFormatter(
&logging.LogFieldFormatter{
Formatter: &log.JSONFormatter{},
Fields: log.Fields{
common.AppLogField: app.Name,
},
},
)
initialLevel := log.InfoLevel
if *debug {
initialLevel = log.DebugLevel
}
log.SetLevel(initialLevel)
log.WithField("files", *cfgFiles).
Info("Loading Placement Engnine config")
var cfg config.Config
if err := common_config.Parse(&cfg, *cfgFiles...); err != nil {
log.WithField("error", err).Fatal("Cannot parse yaml config")
}
if *enableSentry {
logging.ConfigureSentry(&cfg.SentryConfig)
}
// now, override any CLI flags in the loaded config.Config
if *zkPath != "" {
cfg.Mesos.ZkPath = *zkPath
}
if len(*electionZkServers) > 0 {
cfg.Election.ZKServers = *electionZkServers
}
if !*useCassandra {
cfg.Storage.UseCassandra = false
}
if *cassandraHosts != nil && len(*cassandraHosts) > 0 {
cfg.Storage.Cassandra.CassandraConn.ContactPoints = *cassandraHosts
}
if *cassandraStore != "" {
cfg.Storage.Cassandra.StoreName = *cassandraStore
}
if *httpPort != 0 {
cfg.Placement.HTTPPort = *httpPort
}
if *grpcPort != 0 {
cfg.Placement.GRPCPort = *grpcPort
}
if *datacenter != "" {
cfg.Storage.Cassandra.CassandraConn.DataCenter = *datacenter
}
if *cassandraPort != 0 {
cfg.Storage.Cassandra.CassandraConn.Port = *cassandraPort
}
if *taskType != "" {
overridePlacementStrategy(*taskType, &cfg)
}
if *taskDequeueLimit != 0 {
cfg.Placement.TaskDequeueLimit = *taskDequeueLimit
}
if *taskDequeuePeriod != 0 {
cfg.Placement.TaskDequeuePeriod = time.Duration(*taskDequeuePeriod) * time.Second
}
if *hostMgrAPIVersionStr != "" {
hostMgrAPIVersion, err := api.ParseVersion(*hostMgrAPIVersionStr)
if err != nil {
log.WithError(err).Fatal("Failed to parse hostmgr-api-version")
}
cfg.Placement.HostManagerAPIVersion = hostMgrAPIVersion
}
if cfg.Placement.HostManagerAPIVersion == "" {
cfg.Placement.HostManagerAPIVersion = api.V0
}
if *useHostPool {
log.Info("Use Host Pool for placement")
cfg.Placement.UseHostPool = true
}
// Parse and setup peloton auth
if len(*authType) != 0 {
cfg.Auth.AuthType = auth.Type(*authType)
cfg.Auth.Path = *authConfigFile
}
if cfg.Placement.HostManagerAPIVersion == "" {
cfg.Placement.HostManagerAPIVersion = api.V0
}
log.WithField("placement_task_type", cfg.Placement.TaskType).
WithField("strategy", cfg.Placement.Strategy).
Info("Placement engine type")
log.WithField("config", cfg).
Info("Completed Loading Placement Engine config")
rootScope, scopeCloser, mux := metrics.InitMetricScope(
&cfg.Metrics,
common.PelotonPlacement,
metrics.TallyFlushInterval,
)
defer scopeCloser.Close()
mux.HandleFunc(logging.LevelOverwrite, logging.LevelOverwriteHandler(initialLevel))
mux.HandleFunc(buildversion.Get, buildversion.Handler(version))
log.Info("Connecting to HostManager")
t := rpc.NewTransport()
hostmgrPeerChooser, err := peer.NewSmartChooser(
cfg.Election,
rootScope,
common.HostManagerRole,
t,
)
if err != nil {
log.WithFields(
log.Fields{
"error": err,
"role": common.HostManagerRole},
).Fatal("Could not create smart peer chooser for host manager")
}
defer hostmgrPeerChooser.Stop()
hostmgrOutbound := t.NewOutbound(hostmgrPeerChooser)
log.Info("Connecting to ResourceManager")
resmgrPeerChooser, err := peer.NewSmartChooser(
cfg.Election,
rootScope,
common.ResourceManagerRole,
t,
)
if err != nil {
log.WithFields(
log.Fields{
"error": err,
"role": common.ResourceManagerRole},
).Fatal("Could not create smart peer chooser for resource manager")
}
defer resmgrPeerChooser.Stop()
resmgrOutbound := t.NewOutbound(resmgrPeerChooser)
log.Info("Setup the PlacementEngine server")
// Now attempt to setup the dispatcher
outbounds := yarpc.Outbounds{
common.PelotonResourceManager: transport.Outbounds{
Unary: resmgrOutbound,
},
common.PelotonHostManager: transport.Outbounds{
Unary: hostmgrOutbound,
},
}
securityManager, err := auth_impl.CreateNewSecurityManager(&cfg.Auth)
if err != nil {
log.WithError(err).
Fatal("Could not enable security feature")
}
authInboundMiddleware := inbound.NewAuthInboundMiddleware(securityManager)
securityClient, err := auth_impl.CreateNewSecurityClient(&cfg.Auth)
if err != nil {
log.WithError(err).
Fatal("Could not establish secure inter-component communication")
}
authOutboundMiddleware := outbound.NewAuthOutboundMiddleware(securityClient)
// Create both HTTP and GRPC inbounds
inbounds := rpc.NewInbounds(
cfg.Placement.HTTPPort,
cfg.Placement.GRPCPort,
mux,
)
log.Debug("Creating new YARPC dispatcher")
dispatcher := yarpc.NewDispatcher(yarpc.Config{
Name: common.PelotonPlacement,
Inbounds: inbounds,
Outbounds: outbounds,
Metrics: yarpc.MetricsConfig{
Tally: rootScope,
},
InboundMiddleware: yarpc.InboundMiddleware{
Unary: authInboundMiddleware,
Oneway: authInboundMiddleware,
Stream: authInboundMiddleware,
},
OutboundMiddleware: yarpc.OutboundMiddleware{
Unary: authOutboundMiddleware,
Oneway: authOutboundMiddleware,
Stream: authOutboundMiddleware,
},
})
log.Debug("Starting YARPC dispatcher")
if err := dispatcher.Start(); err != nil {
log.Fatalf("Unable to start dispatcher: %v", err)
}
defer dispatcher.Stop()
tallyMetrics := tally_metrics.NewMetrics(
rootScope.SubScope("placement"))
resourceManager := resmgrsvc.NewResourceManagerServiceYARPCClient(
dispatcher.ClientConfig(common.PelotonResourceManager))
hostManager := hostsvc.NewInternalHostServiceYARPCClient(
dispatcher.ClientConfig(common.PelotonHostManager))
var offerService offers.Service
if cfg.Placement.HostManagerAPIVersion.IsV1() {
hostManagerV1 := hostsvc_v1.NewHostManagerServiceYARPCClient(
dispatcher.ClientConfig(common.PelotonHostManager))
offerService = offers_v1.NewService(
hostManagerV1,
resourceManager,
tallyMetrics,
)
} else {
offerService = offers_v0.NewService(
hostManager,
resourceManager,
tallyMetrics,
)
}
taskService := tasks.NewService(
resourceManager,
&cfg.Placement,
tallyMetrics,
)
hostsService := hosts.NewService(
hostManager,
resourceManager,
tallyMetrics,
)
strategy := initPlacementStrategy(cfg)
pool := async.NewPool(async.PoolOptions{
MaxWorkers: cfg.Placement.Concurrency,
}, nil)
pool.Start()
engine := placement.New(
rootScope,
&cfg.Placement,
offerService,
taskService,
hostsService,
strategy,
pool,
)
log.Info("Start the PlacementEngine")
engine.Start()
defer engine.Stop()
log.Info("Initialize the Heartbeat process")
// we can *honestly* say the server is booted up now
health.InitHeartbeat(rootScope, cfg.Health, nil)
// start collecting runtime metrics
defer metrics.StartCollectingRuntimeMetrics(
rootScope,
cfg.Metrics.RuntimeMetrics.Enabled,
cfg.Metrics.RuntimeMetrics.CollectInterval)()
select {}
}
func initPlacementStrategy(cfg config.Config) plugins.Strategy {
var strategy plugins.Strategy
switch cfg.Placement.Strategy {
case config.Batch:
strategy = batch.New(&cfg.Placement)
case config.Mimir:
// TODO avyas check mimir concurrency parameters
cfg.Placement.Concurrency = 1
placer := algorithms.NewPlacer(4, 300)
strategy = mimir_strategy.New(placer, &cfg.Placement)
}
return strategy
}
// overrides the strategy based on the task type supplied at runtime.
func overridePlacementStrategy(taskType string, cfg *config.Config) {
tt, ok := resmgr.TaskType_value[taskType]
if !ok {
log.WithField("placement_task_type", taskType).
Fatal("Invalid placement task type")
}
cfg.Placement.TaskType = resmgr.TaskType(tt)
switch cfg.Placement.TaskType {
case resmgr.TaskType_STATEFUL, resmgr.TaskType_STATELESS:
// Use mimir strategy for stateful and stateless task placement.
cfg.Placement.Strategy = config.Mimir
cfg.Placement.FetchOfferTasks = true
default:
// Use batch strategy for everything else.
cfg.Placement.Strategy = config.Batch
cfg.Placement.FetchOfferTasks = false
}
}