func main()

in cmd/jobmgr/main.go [266:779]


func main() {
	app.Version(version)
	app.HelpFlag.Short('h')
	kingpin.MustParse(app.Parse(os.Args[1:]))

	log.SetFormatter(
		&logging.LogFieldFormatter{
			Formatter: &logging.SecretsFormatter{Formatter: &log.JSONFormatter{}},
			Fields: log.Fields{
				common.AppLogField: app.Name,
			},
		},
	)

	initialLevel := log.InfoLevel
	if *debug {
		initialLevel = log.DebugLevel
	}
	log.SetLevel(initialLevel)

	log.WithField("job_type", jobType).Info("Loaded job type for the cluster")

	log.WithField("files", *cfgFiles).Info("Loading job manager config")
	var cfg Config
	if err := config.Parse(&cfg, *cfgFiles...); err != nil {
		log.WithField("error", err).Fatal("Cannot parse yaml config")
	}

	if *enableSentry {
		logging.ConfigureSentry(&cfg.SentryConfig)
	}

	if *enableSecrets {
		cfg.JobManager.JobSvcCfg.EnableSecrets = true
	}

	// now, override any CLI flags in the loaded config.Config
	if *httpPort != 0 {
		cfg.JobManager.HTTPPort = *httpPort
	}

	if *grpcPort != 0 {
		cfg.JobManager.GRPCPort = *grpcPort
	}

	if len(*electionZkServers) > 0 {
		cfg.Election.ZKServers = *electionZkServers
	}

	if *placementDequeLimit != 0 {
		cfg.JobManager.Placement.PlacementDequeueLimit = *placementDequeLimit
	}

	if *getPlacementsTimeout != 0 {
		cfg.JobManager.Placement.GetPlacementsTimeout = *getPlacementsTimeout
	}

	if !*useCassandra {
		cfg.Storage.UseCassandra = false
	}

	if *cassandraHosts != nil && len(*cassandraHosts) > 0 {
		cfg.Storage.Cassandra.CassandraConn.ContactPoints = *cassandraHosts
	}

	if *cassandraStore != "" {
		cfg.Storage.Cassandra.StoreName = *cassandraStore
	}

	if *cassandraPort != 0 {
		cfg.Storage.Cassandra.CassandraConn.Port = *cassandraPort
	}

	if *datacenter != "" {
		cfg.Storage.Cassandra.CassandraConn.DataCenter = *datacenter
	}

	if *hostMgrAPIVersionStr != "" {
		hostMgrAPIVersion, err := api.ParseVersion(*hostMgrAPIVersionStr)
		if err != nil {
			log.WithError(err).Fatal("Failed to parse hostmgr-api-version")
		}
		cfg.JobManager.HostManagerAPIVersion = hostMgrAPIVersion
	}
	if cfg.JobManager.HostManagerAPIVersion == "" {
		cfg.JobManager.HostManagerAPIVersion = api.V0
	}

	// Parse and setup peloton secrets
	if *pelotonSecretFile != "" {
		var secretsCfg config.PelotonSecretsConfig
		if err := config.Parse(&secretsCfg, *pelotonSecretFile); err != nil {
			log.WithError(err).
				WithField("peloton_secret_file", *pelotonSecretFile).
				Fatal("Cannot parse secret config")
		}
		cfg.Storage.Cassandra.CassandraConn.Username =
			secretsCfg.CassandraUsername
		cfg.Storage.Cassandra.CassandraConn.Password =
			secretsCfg.CassandraPassword
	}

	// Parse and setup peloton auth
	if len(*authType) != 0 {
		cfg.Auth.AuthType = auth.Type(*authType)
		cfg.Auth.Path = *authConfigFile
	}

	// Parse rate limit config
	if *taskKillRateLimit != 0 {
		cfg.JobManager.GoalState.RateLimiterConfig.TaskKill.Rate = rate.Limit(*taskKillRateLimit)
	}

	if *taskKillBurstLimit != 0 {
		cfg.JobManager.GoalState.RateLimiterConfig.TaskKill.Burst = *taskKillBurstLimit
	}

	if *executorShutdownRateLimit != 0 {
		cfg.JobManager.GoalState.RateLimiterConfig.ExecutorShutdown.Rate = rate.Limit(*executorShutdownRateLimit)
	}

	if *taskLaunchTimeout != "0" {
		var err error
		cfg.JobManager.GoalState.LaunchTimeout, err = time.ParseDuration(*taskLaunchTimeout)
		if err != nil {
			log.WithError(err).
				WithField("TASK_LAUNCH_TIMEOUT", *taskLaunchTimeout).
				Fatal("Cannot parse launch timeout")
		}
	}

	if *taskStartTimeout != "0" {
		var err error
		cfg.JobManager.GoalState.StartTimeout, err = time.ParseDuration(*taskStartTimeout)
		if err != nil {
			log.WithError(err).
				WithField("TASK_START_TIMEOUT", *taskStartTimeout).
				Fatal("Cannot parse start timeout")
		}
	}

	if *executorShutdownBurstLimit != 0 {
		cfg.JobManager.GoalState.RateLimiterConfig.ExecutorShutdown.Burst = *executorShutdownBurstLimit
	}

	// Validate thermos executor config
	if err := cfg.JobManager.JobSvcCfg.ThermosExecutor.Validate(); err != nil {
		log.WithError(err).Fatal("Cannot validate thermos executor config")
	}

	log.WithField("config", cfg).Info("Loaded Job Manager configuration")

	rootScope, scopeCloser, mux := metrics.InitMetricScope(
		&cfg.Metrics,
		common.PelotonJobManager,
		metrics.TallyFlushInterval,
	)
	defer scopeCloser.Close()

	mux.HandleFunc(
		logging.LevelOverwrite,
		logging.LevelOverwriteHandler(initialLevel),
	)

	mux.HandleFunc(buildversion.Get, buildversion.Handler(version))

	// store implements JobStore, TaskStore, VolumeStore, UpdateStore
	// and FrameworkInfoStore
	store := stores.MustCreateStore(&cfg.Storage, rootScope)
	ormStore, ormErr := ormobjects.NewCassandraStore(
		cassandra.ToOrmConfig(&cfg.Storage.Cassandra),
		rootScope)
	if ormErr != nil {
		log.WithError(ormErr).Fatal("Failed to create ORM store for Cassandra")
	}

	// Create both HTTP and GRPC inbounds
	inbounds := rpc.NewInbounds(
		cfg.JobManager.HTTPPort,
		cfg.JobManager.GRPCPort,
		mux,
	)

	// all leader discovery metrics share a scope (and will be tagged
	// with role={role})
	discoveryScope := rootScope.SubScope("discovery")
	// setup the discovery service to detect resmgr leaders and
	// configure the YARPC Peer dynamically
	t := rpc.NewTransport()
	resmgrPeerChooser, err := peer.NewSmartChooser(
		cfg.Election,
		discoveryScope,
		common.ResourceManagerRole,
		t,
	)
	if err != nil {
		log.WithFields(log.Fields{"error": err, "role": common.ResourceManagerRole}).
			Fatal("Could not create smart peer chooser")
	}
	defer resmgrPeerChooser.Stop()

	resmgrOutbound := t.NewOutbound(resmgrPeerChooser)

	// setup the discovery service to detect hostmgr leaders and
	// configure the YARPC Peer dynamically
	hostmgrPeerChooser, err := peer.NewSmartChooser(
		cfg.Election,
		discoveryScope,
		common.HostManagerRole,
		t,
	)
	if err != nil {
		log.WithFields(log.Fields{"error": err, "role": common.HostManagerRole}).
			Fatal("Could not create smart peer chooser")
	}
	defer hostmgrPeerChooser.Stop()

	hostmgrOutbound := t.NewOutbound(hostmgrPeerChooser)

	outbounds := yarpc.Outbounds{
		common.PelotonResourceManager: transport.Outbounds{
			Unary: resmgrOutbound,
		},
		common.PelotonHostManager: transport.Outbounds{
			Unary: hostmgrOutbound,
		},
	}

	securityManager, err := auth_impl.CreateNewSecurityManager(&cfg.Auth)
	if err != nil {
		log.WithError(err).
			Fatal("Could not enable security feature")
	}

	rateLimitMiddleware, err := inbound.NewRateLimitInboundMiddleware(cfg.RateLimit)
	if err != nil {
		log.WithError(err).
			Fatal("Could not create rate limit middleware")
	}
	authInboundMiddleware := inbound.NewAuthInboundMiddleware(securityManager)
	apiLockInboundMiddleware := inbound.NewAPILockInboundMiddleware(&cfg.APILock)

	yarpcMetricsMiddleware := &inbound.YAPRCMetricsInboundMiddleware{Scope: rootScope.SubScope("yarpc")}

	securityClient, err := auth_impl.CreateNewSecurityClient(&cfg.Auth)
	if err != nil {
		log.WithError(err).
			Fatal("Could not establish secure inter-component communication")
	}

	authOutboundMiddleware := outbound.NewAuthOutboundMiddleware(securityClient)

	dispatcher := yarpc.NewDispatcher(yarpc.Config{
		Name:      common.PelotonJobManager,
		Inbounds:  inbounds,
		Outbounds: outbounds,
		Metrics: yarpc.MetricsConfig{
			Tally: rootScope,
		},
		InboundMiddleware: yarpc.InboundMiddleware{
			Unary:  yarpc.UnaryInboundMiddleware(apiLockInboundMiddleware, rateLimitMiddleware, authInboundMiddleware, yarpcMetricsMiddleware),
			Stream: yarpc.StreamInboundMiddleware(apiLockInboundMiddleware, rateLimitMiddleware, authInboundMiddleware, yarpcMetricsMiddleware),
			Oneway: yarpc.OnewayInboundMiddleware(apiLockInboundMiddleware, rateLimitMiddleware, authInboundMiddleware, yarpcMetricsMiddleware),
		},
		OutboundMiddleware: yarpc.OutboundMiddleware{
			Unary:  authOutboundMiddleware,
			Stream: authOutboundMiddleware,
			Oneway: authOutboundMiddleware,
		},
	})

	// Declare background works
	backgroundManager := background.NewManager()

	// Register UpdateActiveTasks function
	activeJobCache := activermtask.NewActiveRMTasks(dispatcher, rootScope)

	backgroundManager.RegisterWorks(
		background.Work{
			Name: "ActiveCacheJob",
			Func: func(_ *atomic.Bool) {
				activeJobCache.UpdateActiveTasks()
			},
			Period: time.Duration(cfg.JobManager.ActiveTaskUpdatePeriod),
		},
	)
	watchProcessor := watchsvc.InitV1AlphaWatchServiceHandler(
		dispatcher,
		rootScope,
		cfg.JobManager.Watch,
	)

	jobFactory := cached.InitJobFactory(
		store, // store implements JobStore
		store, // store implements TaskStore
		store, // store implements UpdateStore
		store, // store implements VolumeStore
		ormStore,
		rootScope,
		[]cached.JobTaskListener{watchsvc.NewWatchListener(watchProcessor)},
	)

	// Register WorkflowProgressCheck
	workflowCheck := &progress.WorkflowProgressCheck{
		JobFactory: jobFactory,
		Metrics:    progress.NewMetrics(rootScope),
		Config:     &cfg.JobManager.WorkflowProgressCheck,
	}
	if err := workflowCheck.Register(backgroundManager); err != nil {
		log.WithError(err).
			Fatal("fail to register workflowCheck in backgroundManager")
	}

	goalStateDriver := goalstate.NewDriver(
		dispatcher,
		store, // store implements JobStore
		store, // store implements TaskStore
		store, // store implements VolumeStore
		store, // store implements UpdateStore
		ormStore,
		jobFactory,
		job.JobType(job.JobType_value[*jobType]),
		rootScope,
		cfg.JobManager.GoalState,
		cfg.JobManager.HostManagerAPIVersion,
	)

	// Init placement processor
	placementProcessor := placement.InitProcessor(
		dispatcher,
		common.PelotonResourceManager,
		jobFactory,
		goalStateDriver,
		cfg.JobManager.HostManagerAPIVersion,
		ormStore,
		&cfg.JobManager.Placement,
		rootScope,
	)

	// Create a new task evictor
	taskEvictor := evictor.New(
		dispatcher,
		common.PelotonResourceManager,
		ormStore, // store implements TaskStore
		jobFactory,
		goalStateDriver,
		cfg.JobManager.HostManagerAPIVersion,
		&cfg.JobManager.Evictor,
		rootScope,
	)

	// Create a new Dead Line tracker for jobs
	deadlineTracker := deadline.New(
		dispatcher,
		store, // store implements JobStore
		store, // store implements TaskStore
		jobFactory,
		goalStateDriver,
		rootScope,
		&cfg.JobManager.Deadline,
	)

	// Create the Task status update which pulls task update events
	// from HM once started after gaining leadership
	statusUpdate := event.NewTaskStatusUpdate(
		dispatcher,
		store, // store implements JobStore
		store, // store implements TaskStore
		store, // store implements VolumeStore
		jobFactory,
		goalStateDriver,
		[]event.Listener{},
		rootScope,
		cfg.JobManager.HostManagerAPIVersion,
	)

	server := jobmgr.NewServer(
		cfg.JobManager.HTTPPort,
		cfg.JobManager.GRPCPort,
		jobFactory,
		goalStateDriver,
		taskEvictor,
		deadlineTracker,
		placementProcessor,
		statusUpdate,
		backgroundManager,
		watchProcessor,
	)

	candidate, err := leader.NewCandidate(
		cfg.Election,
		rootScope,
		common.JobManagerRole,
		server,
	)
	if err != nil {
		log.Fatalf("Unable to create leader candidate: %v", err)
	}

	jobsvc.InitServiceHandler(
		dispatcher,
		rootScope,
		store, // store implements JobStore
		store, // store implements TaskStore
		ormStore,
		jobFactory,
		goalStateDriver,
		candidate,
		common.PelotonResourceManager, // TODO: to be removed
		cfg.JobManager.JobSvcCfg,
	)

	private.InitPrivateJobServiceHandler(
		dispatcher,
		store,
		store,
		store,
		ormStore,
		jobFactory,
		goalStateDriver,
		candidate,
	)

	stateless.InitV1AlphaJobServiceHandler(
		dispatcher,
		store,
		store,
		store,
		ormStore,
		jobFactory,
		goalStateDriver,
		candidate,
		cfg.JobManager.JobSvcCfg,
		activeJobCache,
	)

	tasksvc.InitServiceHandler(
		dispatcher,
		rootScope,
		ormStore,
		store, // store implements TaskStore
		store, // store implements UpdateStore
		store, // store implements FrameworkInfoStore
		jobFactory,
		goalStateDriver,
		candidate,
		*mesosAgentWorkDir,
		common.PelotonHostManager,
		logmanager.NewLogManager(&http.Client{Timeout: _httpClientTimeout}),
		activeJobCache,
		cfg.JobManager.HostManagerAPIVersion,
	)

	podsvc.InitV1AlphaPodServiceHandler(
		dispatcher,
		store,
		store,
		store,
		ormStore,
		jobFactory,
		goalStateDriver,
		candidate,
		logmanager.NewLogManager(&http.Client{Timeout: _httpClientTimeout}),
		*mesosAgentWorkDir,
		hostsvc.NewInternalHostServiceYARPCClient(dispatcher.ClientConfig(common.PelotonHostManager)),
	)

	volumesvc.InitServiceHandler(
		dispatcher,
		rootScope,
	)

	updatesvc.InitServiceHandler(
		dispatcher,
		rootScope,
		ormStore,
		store, // store implements UpdateStore
		goalStateDriver,
		jobFactory,
	)

	adminsvc.InitServiceHandler(
		dispatcher,
		goalStateDriver,
		apiLockInboundMiddleware,
	)

	// Start dispatch loop
	if err := dispatcher.Start(); err != nil {
		log.Fatalf("Could not start rpc server: %v", err)
	}

	err = candidate.Start()
	if err != nil {
		log.Fatalf("Unable to start leader candidate: %v", err)
	}
	defer candidate.Stop()

	log.WithFields(log.Fields{
		"httpPort": cfg.JobManager.HTTPPort,
		"grpcPort": cfg.JobManager.GRPCPort,
	}).Info("Started job manager")

	// we can *honestly* say the server is booted up now
	health.InitHeartbeat(rootScope, cfg.Health, candidate)

	// start collecting runtime metrics
	defer metrics.StartCollectingRuntimeMetrics(
		rootScope,
		cfg.Metrics.RuntimeMetrics.Enabled,
		cfg.Metrics.RuntimeMetrics.CollectInterval)()

	select {}
}