in cmd/jobmgr/main.go [266:779]
func main() {
app.Version(version)
app.HelpFlag.Short('h')
kingpin.MustParse(app.Parse(os.Args[1:]))
log.SetFormatter(
&logging.LogFieldFormatter{
Formatter: &logging.SecretsFormatter{Formatter: &log.JSONFormatter{}},
Fields: log.Fields{
common.AppLogField: app.Name,
},
},
)
initialLevel := log.InfoLevel
if *debug {
initialLevel = log.DebugLevel
}
log.SetLevel(initialLevel)
log.WithField("job_type", jobType).Info("Loaded job type for the cluster")
log.WithField("files", *cfgFiles).Info("Loading job manager config")
var cfg Config
if err := config.Parse(&cfg, *cfgFiles...); err != nil {
log.WithField("error", err).Fatal("Cannot parse yaml config")
}
if *enableSentry {
logging.ConfigureSentry(&cfg.SentryConfig)
}
if *enableSecrets {
cfg.JobManager.JobSvcCfg.EnableSecrets = true
}
// now, override any CLI flags in the loaded config.Config
if *httpPort != 0 {
cfg.JobManager.HTTPPort = *httpPort
}
if *grpcPort != 0 {
cfg.JobManager.GRPCPort = *grpcPort
}
if len(*electionZkServers) > 0 {
cfg.Election.ZKServers = *electionZkServers
}
if *placementDequeLimit != 0 {
cfg.JobManager.Placement.PlacementDequeueLimit = *placementDequeLimit
}
if *getPlacementsTimeout != 0 {
cfg.JobManager.Placement.GetPlacementsTimeout = *getPlacementsTimeout
}
if !*useCassandra {
cfg.Storage.UseCassandra = false
}
if *cassandraHosts != nil && len(*cassandraHosts) > 0 {
cfg.Storage.Cassandra.CassandraConn.ContactPoints = *cassandraHosts
}
if *cassandraStore != "" {
cfg.Storage.Cassandra.StoreName = *cassandraStore
}
if *cassandraPort != 0 {
cfg.Storage.Cassandra.CassandraConn.Port = *cassandraPort
}
if *datacenter != "" {
cfg.Storage.Cassandra.CassandraConn.DataCenter = *datacenter
}
if *hostMgrAPIVersionStr != "" {
hostMgrAPIVersion, err := api.ParseVersion(*hostMgrAPIVersionStr)
if err != nil {
log.WithError(err).Fatal("Failed to parse hostmgr-api-version")
}
cfg.JobManager.HostManagerAPIVersion = hostMgrAPIVersion
}
if cfg.JobManager.HostManagerAPIVersion == "" {
cfg.JobManager.HostManagerAPIVersion = api.V0
}
// Parse and setup peloton secrets
if *pelotonSecretFile != "" {
var secretsCfg config.PelotonSecretsConfig
if err := config.Parse(&secretsCfg, *pelotonSecretFile); err != nil {
log.WithError(err).
WithField("peloton_secret_file", *pelotonSecretFile).
Fatal("Cannot parse secret config")
}
cfg.Storage.Cassandra.CassandraConn.Username =
secretsCfg.CassandraUsername
cfg.Storage.Cassandra.CassandraConn.Password =
secretsCfg.CassandraPassword
}
// Parse and setup peloton auth
if len(*authType) != 0 {
cfg.Auth.AuthType = auth.Type(*authType)
cfg.Auth.Path = *authConfigFile
}
// Parse rate limit config
if *taskKillRateLimit != 0 {
cfg.JobManager.GoalState.RateLimiterConfig.TaskKill.Rate = rate.Limit(*taskKillRateLimit)
}
if *taskKillBurstLimit != 0 {
cfg.JobManager.GoalState.RateLimiterConfig.TaskKill.Burst = *taskKillBurstLimit
}
if *executorShutdownRateLimit != 0 {
cfg.JobManager.GoalState.RateLimiterConfig.ExecutorShutdown.Rate = rate.Limit(*executorShutdownRateLimit)
}
if *taskLaunchTimeout != "0" {
var err error
cfg.JobManager.GoalState.LaunchTimeout, err = time.ParseDuration(*taskLaunchTimeout)
if err != nil {
log.WithError(err).
WithField("TASK_LAUNCH_TIMEOUT", *taskLaunchTimeout).
Fatal("Cannot parse launch timeout")
}
}
if *taskStartTimeout != "0" {
var err error
cfg.JobManager.GoalState.StartTimeout, err = time.ParseDuration(*taskStartTimeout)
if err != nil {
log.WithError(err).
WithField("TASK_START_TIMEOUT", *taskStartTimeout).
Fatal("Cannot parse start timeout")
}
}
if *executorShutdownBurstLimit != 0 {
cfg.JobManager.GoalState.RateLimiterConfig.ExecutorShutdown.Burst = *executorShutdownBurstLimit
}
// Validate thermos executor config
if err := cfg.JobManager.JobSvcCfg.ThermosExecutor.Validate(); err != nil {
log.WithError(err).Fatal("Cannot validate thermos executor config")
}
log.WithField("config", cfg).Info("Loaded Job Manager configuration")
rootScope, scopeCloser, mux := metrics.InitMetricScope(
&cfg.Metrics,
common.PelotonJobManager,
metrics.TallyFlushInterval,
)
defer scopeCloser.Close()
mux.HandleFunc(
logging.LevelOverwrite,
logging.LevelOverwriteHandler(initialLevel),
)
mux.HandleFunc(buildversion.Get, buildversion.Handler(version))
// store implements JobStore, TaskStore, VolumeStore, UpdateStore
// and FrameworkInfoStore
store := stores.MustCreateStore(&cfg.Storage, rootScope)
ormStore, ormErr := ormobjects.NewCassandraStore(
cassandra.ToOrmConfig(&cfg.Storage.Cassandra),
rootScope)
if ormErr != nil {
log.WithError(ormErr).Fatal("Failed to create ORM store for Cassandra")
}
// Create both HTTP and GRPC inbounds
inbounds := rpc.NewInbounds(
cfg.JobManager.HTTPPort,
cfg.JobManager.GRPCPort,
mux,
)
// all leader discovery metrics share a scope (and will be tagged
// with role={role})
discoveryScope := rootScope.SubScope("discovery")
// setup the discovery service to detect resmgr leaders and
// configure the YARPC Peer dynamically
t := rpc.NewTransport()
resmgrPeerChooser, err := peer.NewSmartChooser(
cfg.Election,
discoveryScope,
common.ResourceManagerRole,
t,
)
if err != nil {
log.WithFields(log.Fields{"error": err, "role": common.ResourceManagerRole}).
Fatal("Could not create smart peer chooser")
}
defer resmgrPeerChooser.Stop()
resmgrOutbound := t.NewOutbound(resmgrPeerChooser)
// setup the discovery service to detect hostmgr leaders and
// configure the YARPC Peer dynamically
hostmgrPeerChooser, err := peer.NewSmartChooser(
cfg.Election,
discoveryScope,
common.HostManagerRole,
t,
)
if err != nil {
log.WithFields(log.Fields{"error": err, "role": common.HostManagerRole}).
Fatal("Could not create smart peer chooser")
}
defer hostmgrPeerChooser.Stop()
hostmgrOutbound := t.NewOutbound(hostmgrPeerChooser)
outbounds := yarpc.Outbounds{
common.PelotonResourceManager: transport.Outbounds{
Unary: resmgrOutbound,
},
common.PelotonHostManager: transport.Outbounds{
Unary: hostmgrOutbound,
},
}
securityManager, err := auth_impl.CreateNewSecurityManager(&cfg.Auth)
if err != nil {
log.WithError(err).
Fatal("Could not enable security feature")
}
rateLimitMiddleware, err := inbound.NewRateLimitInboundMiddleware(cfg.RateLimit)
if err != nil {
log.WithError(err).
Fatal("Could not create rate limit middleware")
}
authInboundMiddleware := inbound.NewAuthInboundMiddleware(securityManager)
apiLockInboundMiddleware := inbound.NewAPILockInboundMiddleware(&cfg.APILock)
yarpcMetricsMiddleware := &inbound.YAPRCMetricsInboundMiddleware{Scope: rootScope.SubScope("yarpc")}
securityClient, err := auth_impl.CreateNewSecurityClient(&cfg.Auth)
if err != nil {
log.WithError(err).
Fatal("Could not establish secure inter-component communication")
}
authOutboundMiddleware := outbound.NewAuthOutboundMiddleware(securityClient)
dispatcher := yarpc.NewDispatcher(yarpc.Config{
Name: common.PelotonJobManager,
Inbounds: inbounds,
Outbounds: outbounds,
Metrics: yarpc.MetricsConfig{
Tally: rootScope,
},
InboundMiddleware: yarpc.InboundMiddleware{
Unary: yarpc.UnaryInboundMiddleware(apiLockInboundMiddleware, rateLimitMiddleware, authInboundMiddleware, yarpcMetricsMiddleware),
Stream: yarpc.StreamInboundMiddleware(apiLockInboundMiddleware, rateLimitMiddleware, authInboundMiddleware, yarpcMetricsMiddleware),
Oneway: yarpc.OnewayInboundMiddleware(apiLockInboundMiddleware, rateLimitMiddleware, authInboundMiddleware, yarpcMetricsMiddleware),
},
OutboundMiddleware: yarpc.OutboundMiddleware{
Unary: authOutboundMiddleware,
Stream: authOutboundMiddleware,
Oneway: authOutboundMiddleware,
},
})
// Declare background works
backgroundManager := background.NewManager()
// Register UpdateActiveTasks function
activeJobCache := activermtask.NewActiveRMTasks(dispatcher, rootScope)
backgroundManager.RegisterWorks(
background.Work{
Name: "ActiveCacheJob",
Func: func(_ *atomic.Bool) {
activeJobCache.UpdateActiveTasks()
},
Period: time.Duration(cfg.JobManager.ActiveTaskUpdatePeriod),
},
)
watchProcessor := watchsvc.InitV1AlphaWatchServiceHandler(
dispatcher,
rootScope,
cfg.JobManager.Watch,
)
jobFactory := cached.InitJobFactory(
store, // store implements JobStore
store, // store implements TaskStore
store, // store implements UpdateStore
store, // store implements VolumeStore
ormStore,
rootScope,
[]cached.JobTaskListener{watchsvc.NewWatchListener(watchProcessor)},
)
// Register WorkflowProgressCheck
workflowCheck := &progress.WorkflowProgressCheck{
JobFactory: jobFactory,
Metrics: progress.NewMetrics(rootScope),
Config: &cfg.JobManager.WorkflowProgressCheck,
}
if err := workflowCheck.Register(backgroundManager); err != nil {
log.WithError(err).
Fatal("fail to register workflowCheck in backgroundManager")
}
goalStateDriver := goalstate.NewDriver(
dispatcher,
store, // store implements JobStore
store, // store implements TaskStore
store, // store implements VolumeStore
store, // store implements UpdateStore
ormStore,
jobFactory,
job.JobType(job.JobType_value[*jobType]),
rootScope,
cfg.JobManager.GoalState,
cfg.JobManager.HostManagerAPIVersion,
)
// Init placement processor
placementProcessor := placement.InitProcessor(
dispatcher,
common.PelotonResourceManager,
jobFactory,
goalStateDriver,
cfg.JobManager.HostManagerAPIVersion,
ormStore,
&cfg.JobManager.Placement,
rootScope,
)
// Create a new task evictor
taskEvictor := evictor.New(
dispatcher,
common.PelotonResourceManager,
ormStore, // store implements TaskStore
jobFactory,
goalStateDriver,
cfg.JobManager.HostManagerAPIVersion,
&cfg.JobManager.Evictor,
rootScope,
)
// Create a new Dead Line tracker for jobs
deadlineTracker := deadline.New(
dispatcher,
store, // store implements JobStore
store, // store implements TaskStore
jobFactory,
goalStateDriver,
rootScope,
&cfg.JobManager.Deadline,
)
// Create the Task status update which pulls task update events
// from HM once started after gaining leadership
statusUpdate := event.NewTaskStatusUpdate(
dispatcher,
store, // store implements JobStore
store, // store implements TaskStore
store, // store implements VolumeStore
jobFactory,
goalStateDriver,
[]event.Listener{},
rootScope,
cfg.JobManager.HostManagerAPIVersion,
)
server := jobmgr.NewServer(
cfg.JobManager.HTTPPort,
cfg.JobManager.GRPCPort,
jobFactory,
goalStateDriver,
taskEvictor,
deadlineTracker,
placementProcessor,
statusUpdate,
backgroundManager,
watchProcessor,
)
candidate, err := leader.NewCandidate(
cfg.Election,
rootScope,
common.JobManagerRole,
server,
)
if err != nil {
log.Fatalf("Unable to create leader candidate: %v", err)
}
jobsvc.InitServiceHandler(
dispatcher,
rootScope,
store, // store implements JobStore
store, // store implements TaskStore
ormStore,
jobFactory,
goalStateDriver,
candidate,
common.PelotonResourceManager, // TODO: to be removed
cfg.JobManager.JobSvcCfg,
)
private.InitPrivateJobServiceHandler(
dispatcher,
store,
store,
store,
ormStore,
jobFactory,
goalStateDriver,
candidate,
)
stateless.InitV1AlphaJobServiceHandler(
dispatcher,
store,
store,
store,
ormStore,
jobFactory,
goalStateDriver,
candidate,
cfg.JobManager.JobSvcCfg,
activeJobCache,
)
tasksvc.InitServiceHandler(
dispatcher,
rootScope,
ormStore,
store, // store implements TaskStore
store, // store implements UpdateStore
store, // store implements FrameworkInfoStore
jobFactory,
goalStateDriver,
candidate,
*mesosAgentWorkDir,
common.PelotonHostManager,
logmanager.NewLogManager(&http.Client{Timeout: _httpClientTimeout}),
activeJobCache,
cfg.JobManager.HostManagerAPIVersion,
)
podsvc.InitV1AlphaPodServiceHandler(
dispatcher,
store,
store,
store,
ormStore,
jobFactory,
goalStateDriver,
candidate,
logmanager.NewLogManager(&http.Client{Timeout: _httpClientTimeout}),
*mesosAgentWorkDir,
hostsvc.NewInternalHostServiceYARPCClient(dispatcher.ClientConfig(common.PelotonHostManager)),
)
volumesvc.InitServiceHandler(
dispatcher,
rootScope,
)
updatesvc.InitServiceHandler(
dispatcher,
rootScope,
ormStore,
store, // store implements UpdateStore
goalStateDriver,
jobFactory,
)
adminsvc.InitServiceHandler(
dispatcher,
goalStateDriver,
apiLockInboundMiddleware,
)
// Start dispatch loop
if err := dispatcher.Start(); err != nil {
log.Fatalf("Could not start rpc server: %v", err)
}
err = candidate.Start()
if err != nil {
log.Fatalf("Unable to start leader candidate: %v", err)
}
defer candidate.Stop()
log.WithFields(log.Fields{
"httpPort": cfg.JobManager.HTTPPort,
"grpcPort": cfg.JobManager.GRPCPort,
}).Info("Started job manager")
// we can *honestly* say the server is booted up now
health.InitHeartbeat(rootScope, cfg.Health, candidate)
// start collecting runtime metrics
defer metrics.StartCollectingRuntimeMetrics(
rootScope,
cfg.Metrics.RuntimeMetrics.Enabled,
cfg.Metrics.RuntimeMetrics.CollectInterval)()
select {}
}