cmd/jobmgr/main.go (635 lines of code) (raw):
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"net/http"
"os"
"time"
"github.com/uber/peloton/.gen/peloton/api/v0/job"
"github.com/uber/peloton/.gen/peloton/private/hostmgr/hostsvc"
"github.com/uber/peloton/pkg/auth"
auth_impl "github.com/uber/peloton/pkg/auth/impl"
"github.com/uber/peloton/pkg/common"
"github.com/uber/peloton/pkg/common/api"
"github.com/uber/peloton/pkg/common/background"
"github.com/uber/peloton/pkg/common/buildversion"
"github.com/uber/peloton/pkg/common/config"
"github.com/uber/peloton/pkg/common/health"
"github.com/uber/peloton/pkg/common/leader"
"github.com/uber/peloton/pkg/common/logging"
"github.com/uber/peloton/pkg/common/metrics"
"github.com/uber/peloton/pkg/common/rpc"
"github.com/uber/peloton/pkg/hostmgr/mesos/yarpc/peer"
"github.com/uber/peloton/pkg/jobmgr"
"github.com/uber/peloton/pkg/jobmgr/adminsvc"
"github.com/uber/peloton/pkg/jobmgr/cached"
"github.com/uber/peloton/pkg/jobmgr/goalstate"
"github.com/uber/peloton/pkg/jobmgr/jobsvc"
"github.com/uber/peloton/pkg/jobmgr/jobsvc/private"
"github.com/uber/peloton/pkg/jobmgr/jobsvc/stateless"
"github.com/uber/peloton/pkg/jobmgr/logmanager"
"github.com/uber/peloton/pkg/jobmgr/podsvc"
"github.com/uber/peloton/pkg/jobmgr/task/activermtask"
"github.com/uber/peloton/pkg/jobmgr/task/deadline"
"github.com/uber/peloton/pkg/jobmgr/task/event"
"github.com/uber/peloton/pkg/jobmgr/task/evictor"
"github.com/uber/peloton/pkg/jobmgr/task/placement"
"github.com/uber/peloton/pkg/jobmgr/tasksvc"
"github.com/uber/peloton/pkg/jobmgr/updatesvc"
"github.com/uber/peloton/pkg/jobmgr/volumesvc"
"github.com/uber/peloton/pkg/jobmgr/watchsvc"
"github.com/uber/peloton/pkg/jobmgr/workflow/progress"
"github.com/uber/peloton/pkg/middleware/inbound"
"github.com/uber/peloton/pkg/middleware/outbound"
"github.com/uber/peloton/pkg/storage/cassandra"
ormobjects "github.com/uber/peloton/pkg/storage/objects"
"github.com/uber/peloton/pkg/storage/stores"
log "github.com/sirupsen/logrus"
"github.com/uber-go/atomic"
_ "go.uber.org/automaxprocs"
"go.uber.org/yarpc"
"go.uber.org/yarpc/api/transport"
"golang.org/x/time/rate"
kingpin "gopkg.in/alecthomas/kingpin.v2"
)
const (
_httpClientTimeout = 15 * time.Second
)
var (
version string
app = kingpin.New(common.PelotonJobManager, "Peloton Job Manager")
debug = app.Flag(
"debug", "enable debug mode (print full json responses)").
Short('d').
Default("false").
Envar("ENABLE_DEBUG_LOGGING").
Bool()
enableSentry = app.Flag(
"enable-sentry", "enable logging hook up to sentry").
Default("false").
Envar("ENABLE_SENTRY_LOGGING").
Bool()
cfgFiles = app.Flag(
"config",
"YAML config files (can be provided multiple times to merge configs)").
Short('c').
Required().
ExistingFiles()
dbHost = app.Flag(
"db-host",
"Database host (db.host override) (set $DB_HOST to override)").
Envar("DB_HOST").
String()
electionZkServers = app.Flag(
"election-zk-server",
"Election Zookeeper servers. Specify multiple times for multiple servers "+
"(election.zk_servers override) (set $ELECTION_ZK_SERVERS to override)").
Envar("ELECTION_ZK_SERVERS").
Strings()
httpPort = app.Flag(
"http-port", "Job manager HTTP port (jobmgr.http_port override) "+
"(set $PORT to override)").
Envar("HTTP_PORT").
Int()
grpcPort = app.Flag(
"grpc-port", "Job manager gRPC port (jobmgr.grpc_port override) "+
"(set $PORT to override)").
Envar("GRPC_PORT").
Int()
placementDequeLimit = app.Flag(
"placement-dequeue-limit", "Placements dequeue limit for each get "+
"placements attempt (jobmgr.placement_dequeue_limit override) "+
"(set $PLACEMENT_DEQUEUE_LIMIT to override)").
Envar("PLACEMENT_DEQUEUE_LIMIT").
Int()
getPlacementsTimeout = app.Flag(
"get-placements-timeout", "Timeout in milisecs for GetPlacements call "+
"(jobmgr.get_placements_timeout override) "+
"(set $GET_PLACEMENTS_TIMEOUT to override) ").
Envar("GET_PLACEMENTS_TIMEOUT").
Int()
useCassandra = app.Flag(
"use-cassandra", "Use cassandra storage implementation").
Default("true").
Envar("USE_CASSANDRA").
Bool()
cassandraHosts = app.Flag(
"cassandra-hosts", "Cassandra hosts").
Envar("CASSANDRA_HOSTS").
Strings()
cassandraStore = app.Flag(
"cassandra-store", "Cassandra store name").
Default("").
Envar("CASSANDRA_STORE").
String()
cassandraPort = app.Flag(
"cassandra-port", "Cassandra port to connect").
Default("0").
Envar("CASSANDRA_PORT").
Int()
pelotonSecretFile = app.Flag(
"peloton-secret-file",
"Secret file containing all Peloton secrets").
Default("").
Envar("PELOTON_SECRET_FILE").
String()
mesosAgentWorkDir = app.Flag(
"mesos-agent-work-dir", "Mesos agent work dir").
Default("/var/lib/mesos/agent").
Envar("MESOS_AGENT_WORK_DIR").
String()
datacenter = app.Flag(
"datacenter", "Datacenter name").
Default("").
Envar("DATACENTER").
String()
enableSecrets = app.Flag(
"enable-secrets", "enable handing secrets for this cluster").
Default("false").
Envar("ENABLE_SECRETS").
Bool()
// TODO: remove this flag and all related code after
// storage layer can figure out recovery
jobType = app.Flag(
"job-type", "Cluster job type").
Default("BATCH").
Envar("JOB_TYPE").
Enum("BATCH", "SERVICE")
authType = app.Flag(
"auth-type",
"Define the auth type used, default to NOOP").
Default("NOOP").
Envar("AUTH_TYPE").
Enum("NOOP", "BASIC")
authConfigFile = app.Flag(
"auth-config-file",
"config file for the auth feature, which is specific to the auth type used").
Default("").
Envar("AUTH_CONFIG_FILE").
String()
taskKillRateLimit = app.Flag(
"task-kill-rate-limit",
"Define the rate limit of calling task kil from goal state engine",
).
Default("0").
Envar("TASK_KILL_RATE_LIMIT").
Float64()
taskKillBurstLimit = app.Flag(
"task-kill-burst-limit",
"Define the burst limit of calling task kill from goal state engine",
).
Default("0").
Envar("TASK_KILL_BURST_LIMIT").
Int()
executorShutdownRateLimit = app.Flag(
"executor-shutdown-rate-limit",
"Define the rate limit of calling executor shutdown from goal state engine",
).
Default("0").
Envar("EXECUTOR_SHUTDOWN_RATE_LIMIT").
Float64()
executorShutdownBurstLimit = app.Flag(
"executor-shutdown-burst-limit",
"Define the burst limit of calling executor shutdown from goal state engine",
).
Default("0").
Envar("EXECUTOR_SHUTDOWN_BURST_LIMIT").
Int()
taskLaunchTimeout = app.Flag(
"task-launch-timeout",
"Timeout after which a task in launched state will be restarted",
).
Default("0").
Envar("TASK_LAUNCH_TIMEOUT").
String()
taskStartTimeout = app.Flag(
"task-start-timeout",
"Timeout after which a task in started state will be restarted",
).
Default("0").
Envar("TASK_START_TIMEOUT").
String()
hostMgrAPIVersionStr = app.Flag(
"hostmgr-api-version",
"Define the API Version of host manager",
).
Default("").
Envar("HOSTMGR_API_VERSION").
String()
)
func main() {
app.Version(version)
app.HelpFlag.Short('h')
kingpin.MustParse(app.Parse(os.Args[1:]))
log.SetFormatter(
&logging.LogFieldFormatter{
Formatter: &logging.SecretsFormatter{Formatter: &log.JSONFormatter{}},
Fields: log.Fields{
common.AppLogField: app.Name,
},
},
)
initialLevel := log.InfoLevel
if *debug {
initialLevel = log.DebugLevel
}
log.SetLevel(initialLevel)
log.WithField("job_type", jobType).Info("Loaded job type for the cluster")
log.WithField("files", *cfgFiles).Info("Loading job manager config")
var cfg Config
if err := config.Parse(&cfg, *cfgFiles...); err != nil {
log.WithField("error", err).Fatal("Cannot parse yaml config")
}
if *enableSentry {
logging.ConfigureSentry(&cfg.SentryConfig)
}
if *enableSecrets {
cfg.JobManager.JobSvcCfg.EnableSecrets = true
}
// now, override any CLI flags in the loaded config.Config
if *httpPort != 0 {
cfg.JobManager.HTTPPort = *httpPort
}
if *grpcPort != 0 {
cfg.JobManager.GRPCPort = *grpcPort
}
if len(*electionZkServers) > 0 {
cfg.Election.ZKServers = *electionZkServers
}
if *placementDequeLimit != 0 {
cfg.JobManager.Placement.PlacementDequeueLimit = *placementDequeLimit
}
if *getPlacementsTimeout != 0 {
cfg.JobManager.Placement.GetPlacementsTimeout = *getPlacementsTimeout
}
if !*useCassandra {
cfg.Storage.UseCassandra = false
}
if *cassandraHosts != nil && len(*cassandraHosts) > 0 {
cfg.Storage.Cassandra.CassandraConn.ContactPoints = *cassandraHosts
}
if *cassandraStore != "" {
cfg.Storage.Cassandra.StoreName = *cassandraStore
}
if *cassandraPort != 0 {
cfg.Storage.Cassandra.CassandraConn.Port = *cassandraPort
}
if *datacenter != "" {
cfg.Storage.Cassandra.CassandraConn.DataCenter = *datacenter
}
if *hostMgrAPIVersionStr != "" {
hostMgrAPIVersion, err := api.ParseVersion(*hostMgrAPIVersionStr)
if err != nil {
log.WithError(err).Fatal("Failed to parse hostmgr-api-version")
}
cfg.JobManager.HostManagerAPIVersion = hostMgrAPIVersion
}
if cfg.JobManager.HostManagerAPIVersion == "" {
cfg.JobManager.HostManagerAPIVersion = api.V0
}
// Parse and setup peloton secrets
if *pelotonSecretFile != "" {
var secretsCfg config.PelotonSecretsConfig
if err := config.Parse(&secretsCfg, *pelotonSecretFile); err != nil {
log.WithError(err).
WithField("peloton_secret_file", *pelotonSecretFile).
Fatal("Cannot parse secret config")
}
cfg.Storage.Cassandra.CassandraConn.Username =
secretsCfg.CassandraUsername
cfg.Storage.Cassandra.CassandraConn.Password =
secretsCfg.CassandraPassword
}
// Parse and setup peloton auth
if len(*authType) != 0 {
cfg.Auth.AuthType = auth.Type(*authType)
cfg.Auth.Path = *authConfigFile
}
// Parse rate limit config
if *taskKillRateLimit != 0 {
cfg.JobManager.GoalState.RateLimiterConfig.TaskKill.Rate = rate.Limit(*taskKillRateLimit)
}
if *taskKillBurstLimit != 0 {
cfg.JobManager.GoalState.RateLimiterConfig.TaskKill.Burst = *taskKillBurstLimit
}
if *executorShutdownRateLimit != 0 {
cfg.JobManager.GoalState.RateLimiterConfig.ExecutorShutdown.Rate = rate.Limit(*executorShutdownRateLimit)
}
if *taskLaunchTimeout != "0" {
var err error
cfg.JobManager.GoalState.LaunchTimeout, err = time.ParseDuration(*taskLaunchTimeout)
if err != nil {
log.WithError(err).
WithField("TASK_LAUNCH_TIMEOUT", *taskLaunchTimeout).
Fatal("Cannot parse launch timeout")
}
}
if *taskStartTimeout != "0" {
var err error
cfg.JobManager.GoalState.StartTimeout, err = time.ParseDuration(*taskStartTimeout)
if err != nil {
log.WithError(err).
WithField("TASK_START_TIMEOUT", *taskStartTimeout).
Fatal("Cannot parse start timeout")
}
}
if *executorShutdownBurstLimit != 0 {
cfg.JobManager.GoalState.RateLimiterConfig.ExecutorShutdown.Burst = *executorShutdownBurstLimit
}
// Validate thermos executor config
if err := cfg.JobManager.JobSvcCfg.ThermosExecutor.Validate(); err != nil {
log.WithError(err).Fatal("Cannot validate thermos executor config")
}
log.WithField("config", cfg).Info("Loaded Job Manager configuration")
rootScope, scopeCloser, mux := metrics.InitMetricScope(
&cfg.Metrics,
common.PelotonJobManager,
metrics.TallyFlushInterval,
)
defer scopeCloser.Close()
mux.HandleFunc(
logging.LevelOverwrite,
logging.LevelOverwriteHandler(initialLevel),
)
mux.HandleFunc(buildversion.Get, buildversion.Handler(version))
// store implements JobStore, TaskStore, VolumeStore, UpdateStore
// and FrameworkInfoStore
store := stores.MustCreateStore(&cfg.Storage, rootScope)
ormStore, ormErr := ormobjects.NewCassandraStore(
cassandra.ToOrmConfig(&cfg.Storage.Cassandra),
rootScope)
if ormErr != nil {
log.WithError(ormErr).Fatal("Failed to create ORM store for Cassandra")
}
// Create both HTTP and GRPC inbounds
inbounds := rpc.NewInbounds(
cfg.JobManager.HTTPPort,
cfg.JobManager.GRPCPort,
mux,
)
// all leader discovery metrics share a scope (and will be tagged
// with role={role})
discoveryScope := rootScope.SubScope("discovery")
// setup the discovery service to detect resmgr leaders and
// configure the YARPC Peer dynamically
t := rpc.NewTransport()
resmgrPeerChooser, err := peer.NewSmartChooser(
cfg.Election,
discoveryScope,
common.ResourceManagerRole,
t,
)
if err != nil {
log.WithFields(log.Fields{"error": err, "role": common.ResourceManagerRole}).
Fatal("Could not create smart peer chooser")
}
defer resmgrPeerChooser.Stop()
resmgrOutbound := t.NewOutbound(resmgrPeerChooser)
// setup the discovery service to detect hostmgr leaders and
// configure the YARPC Peer dynamically
hostmgrPeerChooser, err := peer.NewSmartChooser(
cfg.Election,
discoveryScope,
common.HostManagerRole,
t,
)
if err != nil {
log.WithFields(log.Fields{"error": err, "role": common.HostManagerRole}).
Fatal("Could not create smart peer chooser")
}
defer hostmgrPeerChooser.Stop()
hostmgrOutbound := t.NewOutbound(hostmgrPeerChooser)
outbounds := yarpc.Outbounds{
common.PelotonResourceManager: transport.Outbounds{
Unary: resmgrOutbound,
},
common.PelotonHostManager: transport.Outbounds{
Unary: hostmgrOutbound,
},
}
securityManager, err := auth_impl.CreateNewSecurityManager(&cfg.Auth)
if err != nil {
log.WithError(err).
Fatal("Could not enable security feature")
}
rateLimitMiddleware, err := inbound.NewRateLimitInboundMiddleware(cfg.RateLimit)
if err != nil {
log.WithError(err).
Fatal("Could not create rate limit middleware")
}
authInboundMiddleware := inbound.NewAuthInboundMiddleware(securityManager)
apiLockInboundMiddleware := inbound.NewAPILockInboundMiddleware(&cfg.APILock)
yarpcMetricsMiddleware := &inbound.YAPRCMetricsInboundMiddleware{Scope: rootScope.SubScope("yarpc")}
securityClient, err := auth_impl.CreateNewSecurityClient(&cfg.Auth)
if err != nil {
log.WithError(err).
Fatal("Could not establish secure inter-component communication")
}
authOutboundMiddleware := outbound.NewAuthOutboundMiddleware(securityClient)
dispatcher := yarpc.NewDispatcher(yarpc.Config{
Name: common.PelotonJobManager,
Inbounds: inbounds,
Outbounds: outbounds,
Metrics: yarpc.MetricsConfig{
Tally: rootScope,
},
InboundMiddleware: yarpc.InboundMiddleware{
Unary: yarpc.UnaryInboundMiddleware(apiLockInboundMiddleware, rateLimitMiddleware, authInboundMiddleware, yarpcMetricsMiddleware),
Stream: yarpc.StreamInboundMiddleware(apiLockInboundMiddleware, rateLimitMiddleware, authInboundMiddleware, yarpcMetricsMiddleware),
Oneway: yarpc.OnewayInboundMiddleware(apiLockInboundMiddleware, rateLimitMiddleware, authInboundMiddleware, yarpcMetricsMiddleware),
},
OutboundMiddleware: yarpc.OutboundMiddleware{
Unary: authOutboundMiddleware,
Stream: authOutboundMiddleware,
Oneway: authOutboundMiddleware,
},
})
// Declare background works
backgroundManager := background.NewManager()
// Register UpdateActiveTasks function
activeJobCache := activermtask.NewActiveRMTasks(dispatcher, rootScope)
backgroundManager.RegisterWorks(
background.Work{
Name: "ActiveCacheJob",
Func: func(_ *atomic.Bool) {
activeJobCache.UpdateActiveTasks()
},
Period: time.Duration(cfg.JobManager.ActiveTaskUpdatePeriod),
},
)
watchProcessor := watchsvc.InitV1AlphaWatchServiceHandler(
dispatcher,
rootScope,
cfg.JobManager.Watch,
)
jobFactory := cached.InitJobFactory(
store, // store implements JobStore
store, // store implements TaskStore
store, // store implements UpdateStore
store, // store implements VolumeStore
ormStore,
rootScope,
[]cached.JobTaskListener{watchsvc.NewWatchListener(watchProcessor)},
)
// Register WorkflowProgressCheck
workflowCheck := &progress.WorkflowProgressCheck{
JobFactory: jobFactory,
Metrics: progress.NewMetrics(rootScope),
Config: &cfg.JobManager.WorkflowProgressCheck,
}
if err := workflowCheck.Register(backgroundManager); err != nil {
log.WithError(err).
Fatal("fail to register workflowCheck in backgroundManager")
}
goalStateDriver := goalstate.NewDriver(
dispatcher,
store, // store implements JobStore
store, // store implements TaskStore
store, // store implements VolumeStore
store, // store implements UpdateStore
ormStore,
jobFactory,
job.JobType(job.JobType_value[*jobType]),
rootScope,
cfg.JobManager.GoalState,
cfg.JobManager.HostManagerAPIVersion,
)
// Init placement processor
placementProcessor := placement.InitProcessor(
dispatcher,
common.PelotonResourceManager,
jobFactory,
goalStateDriver,
cfg.JobManager.HostManagerAPIVersion,
ormStore,
&cfg.JobManager.Placement,
rootScope,
)
// Create a new task evictor
taskEvictor := evictor.New(
dispatcher,
common.PelotonResourceManager,
ormStore, // store implements TaskStore
jobFactory,
goalStateDriver,
cfg.JobManager.HostManagerAPIVersion,
&cfg.JobManager.Evictor,
rootScope,
)
// Create a new Dead Line tracker for jobs
deadlineTracker := deadline.New(
dispatcher,
store, // store implements JobStore
store, // store implements TaskStore
jobFactory,
goalStateDriver,
rootScope,
&cfg.JobManager.Deadline,
)
// Create the Task status update which pulls task update events
// from HM once started after gaining leadership
statusUpdate := event.NewTaskStatusUpdate(
dispatcher,
store, // store implements JobStore
store, // store implements TaskStore
store, // store implements VolumeStore
jobFactory,
goalStateDriver,
[]event.Listener{},
rootScope,
cfg.JobManager.HostManagerAPIVersion,
)
server := jobmgr.NewServer(
cfg.JobManager.HTTPPort,
cfg.JobManager.GRPCPort,
jobFactory,
goalStateDriver,
taskEvictor,
deadlineTracker,
placementProcessor,
statusUpdate,
backgroundManager,
watchProcessor,
)
candidate, err := leader.NewCandidate(
cfg.Election,
rootScope,
common.JobManagerRole,
server,
)
if err != nil {
log.Fatalf("Unable to create leader candidate: %v", err)
}
jobsvc.InitServiceHandler(
dispatcher,
rootScope,
store, // store implements JobStore
store, // store implements TaskStore
ormStore,
jobFactory,
goalStateDriver,
candidate,
common.PelotonResourceManager, // TODO: to be removed
cfg.JobManager.JobSvcCfg,
)
private.InitPrivateJobServiceHandler(
dispatcher,
store,
store,
store,
ormStore,
jobFactory,
goalStateDriver,
candidate,
)
stateless.InitV1AlphaJobServiceHandler(
dispatcher,
store,
store,
store,
ormStore,
jobFactory,
goalStateDriver,
candidate,
cfg.JobManager.JobSvcCfg,
activeJobCache,
)
tasksvc.InitServiceHandler(
dispatcher,
rootScope,
ormStore,
store, // store implements TaskStore
store, // store implements UpdateStore
store, // store implements FrameworkInfoStore
jobFactory,
goalStateDriver,
candidate,
*mesosAgentWorkDir,
common.PelotonHostManager,
logmanager.NewLogManager(&http.Client{Timeout: _httpClientTimeout}),
activeJobCache,
cfg.JobManager.HostManagerAPIVersion,
)
podsvc.InitV1AlphaPodServiceHandler(
dispatcher,
store,
store,
store,
ormStore,
jobFactory,
goalStateDriver,
candidate,
logmanager.NewLogManager(&http.Client{Timeout: _httpClientTimeout}),
*mesosAgentWorkDir,
hostsvc.NewInternalHostServiceYARPCClient(dispatcher.ClientConfig(common.PelotonHostManager)),
)
volumesvc.InitServiceHandler(
dispatcher,
rootScope,
)
updatesvc.InitServiceHandler(
dispatcher,
rootScope,
ormStore,
store, // store implements UpdateStore
goalStateDriver,
jobFactory,
)
adminsvc.InitServiceHandler(
dispatcher,
goalStateDriver,
apiLockInboundMiddleware,
)
// Start dispatch loop
if err := dispatcher.Start(); err != nil {
log.Fatalf("Could not start rpc server: %v", err)
}
err = candidate.Start()
if err != nil {
log.Fatalf("Unable to start leader candidate: %v", err)
}
defer candidate.Stop()
log.WithFields(log.Fields{
"httpPort": cfg.JobManager.HTTPPort,
"grpcPort": cfg.JobManager.GRPCPort,
}).Info("Started job manager")
// we can *honestly* say the server is booted up now
health.InitHeartbeat(rootScope, cfg.Health, candidate)
// start collecting runtime metrics
defer metrics.StartCollectingRuntimeMetrics(
rootScope,
cfg.Metrics.RuntimeMetrics.Enabled,
cfg.Metrics.RuntimeMetrics.CollectInterval)()
select {}
}