in cmd/apiserver/main.go [92:323]
func main() {
app.Version(version)
app.HelpFlag.Short('h')
kingpin.MustParse(app.Parse(os.Args[1:]))
// Setup logging.
log.SetFormatter(
&logging.LogFieldFormatter{
Formatter: &logging.SecretsFormatter{Formatter: &log.JSONFormatter{}},
Fields: log.Fields{
common.AppLogField: app.Name,
},
},
)
initialLevel := log.InfoLevel
if *debug {
initialLevel = log.DebugLevel
}
log.SetLevel(initialLevel)
// Load and override API Server configurations.
log.WithField("files", *cfgFiles).Info("Loading API Server config")
var cfg Config
if err := config.Parse(&cfg, *cfgFiles...); err != nil {
log.WithField("error", err).Fatal("Cannot parse yaml config")
}
if *httpPort != 0 {
cfg.APIServer.HTTPPort = *httpPort
}
if *grpcPort != 0 {
cfg.APIServer.GRPCPort = *grpcPort
}
if len(*electionZkServers) > 0 {
cfg.Election.ZKServers = *electionZkServers
}
// Parse and setup Peloton authentication.
if len(*authType) != 0 {
cfg.Auth.AuthType = auth.Type(*authType)
cfg.Auth.Path = *authConfigFile
}
log.WithField("config", cfg).Info("Loaded API Server configuration")
// Configure tally metrics.
rootScope, scopeCloser, mux := metrics.InitMetricScope(
&cfg.Metrics,
common.PelotonAPIServer,
metrics.TallyFlushInterval,
)
defer scopeCloser.Close()
// Configure log and version handlers.
mux.HandleFunc(
logging.LevelOverwrite,
logging.LevelOverwriteHandler(initialLevel))
mux.HandleFunc(buildversion.Get, buildversion.Handler(version))
// Create both HTTP and GRPC inbounds.
inbounds := rpc.NewInbounds(
cfg.APIServer.HTTPPort,
cfg.APIServer.GRPCPort,
mux,
)
// All leader discovery metrics share a scope (and will be tagged
// with role={role}).
discoveryScope := rootScope.SubScope("discovery")
// Setup the discovery service to detect host leaders and
// configure the YARPC Peer dynamically.
t := rpc.NewTransport()
// Setup resmgr peer chooser.
resmgrPeerChooser, err := peer.NewSmartChooser(
cfg.Election,
discoveryScope,
common.ResourceManagerRole,
t,
)
if err != nil {
log.WithFields(
log.Fields{"error": err, "role": common.ResourceManagerRole},
).Fatal("Could not create smart peer chooser")
}
defer resmgrPeerChooser.Stop()
// Create resmgr outbound.
resmgrOutbound := t.NewOutbound(resmgrPeerChooser)
// Setup hostmgr peer chooser.
hostmgrPeerChooser, err := peer.NewSmartChooser(
cfg.Election,
discoveryScope,
common.HostManagerRole,
t,
)
if err != nil {
log.WithFields(
log.Fields{"error": err, "role": common.HostManagerRole},
).Fatal("Could not create smart peer chooser")
}
defer hostmgrPeerChooser.Stop()
// Create hostmgr outbound.
hostmgrOutbound := t.NewOutbound(hostmgrPeerChooser)
// Setup jobmgr peer chooser.
jobmgrPeerChooser, err := peer.NewSmartChooser(
cfg.Election,
discoveryScope,
common.JobManagerRole,
t,
)
if err != nil {
log.WithError(err).
WithField("role:", common.JobManagerRole).
Fatal("Could not create smart peer chooser")
}
defer jobmgrPeerChooser.Stop()
// Create jobmgr outbound.
jobmgrOutbound := t.NewOutbound(jobmgrPeerChooser)
// Add all required outbounds.
outbounds := yarpc.Outbounds{
common.PelotonResourceManager: transport.Outbounds{
Unary: resmgrOutbound,
Stream: resmgrOutbound,
},
common.PelotonHostManager: transport.Outbounds{
Unary: hostmgrOutbound,
Stream: hostmgrOutbound,
},
common.PelotonJobManager: transport.Outbounds{
Unary: jobmgrOutbound,
Stream: jobmgrOutbound,
},
}
// Create security manager for inbound authentication middleware.
securityManager, err := authimpl.CreateNewSecurityManager(&cfg.Auth)
if err != nil {
log.WithError(err).
Fatal("Could not enable security feature")
}
// Setup inbound rate limit middleware.
rateLimitMiddleware, err := inbound.NewRateLimitInboundMiddleware(cfg.RateLimit)
if err != nil {
log.WithError(err).
Fatal("Could not create rate limit middleware")
}
// Setup inbound authentication middleware.
authInboundMiddleware := inbound.NewAuthInboundMiddleware(securityManager)
// Create security client for outbound authentication middleware.
securityClient, err := authimpl.CreateNewSecurityClient(&cfg.Auth)
if err != nil {
log.WithError(err).
Fatal("Could not establish secure inter-component communication")
}
// Setup outbound authentication middleware.
authOutboundMiddleware := outbound.NewAuthOutboundMiddleware(securityClient)
// Create YARPC dispatcher.
dispatcher := yarpc.NewDispatcher(yarpc.Config{
Name: common.PelotonAPIServer,
Inbounds: inbounds,
Outbounds: outbounds,
Metrics: yarpc.MetricsConfig{
Tally: rootScope,
},
InboundMiddleware: yarpc.InboundMiddleware{
Unary: yarpc.UnaryInboundMiddleware(rateLimitMiddleware, authInboundMiddleware),
Stream: yarpc.StreamInboundMiddleware(rateLimitMiddleware, authInboundMiddleware),
Oneway: yarpc.OnewayInboundMiddleware(rateLimitMiddleware, authInboundMiddleware),
},
OutboundMiddleware: yarpc.OutboundMiddleware{
Unary: authOutboundMiddleware,
Stream: authOutboundMiddleware,
Oneway: authOutboundMiddleware,
},
})
// Register service procedures in dispatcher.
var procedures []transport.Procedure
procedures = append(
procedures,
apiserver.BuildHostManagerProcedures(
outbounds[common.PelotonHostManager],
)...,
)
procedures = append(
procedures,
apiserver.BuildJobManagerProcedures(
outbounds[common.PelotonJobManager],
)...,
)
procedures = append(
procedures,
apiserver.BuildResourceManagerProcedures(
outbounds[common.PelotonResourceManager],
)...,
)
dispatcher.Register(procedures)
for _, p := range procedures {
log.Debug("Registering procedure: " + p.Name)
}
// Start YARPC dispatcher.
if err := dispatcher.Start(); err != nil {
log.Fatalf("Could not start rpc server: %v", err)
}
defer dispatcher.Stop()
// Start collecting runtime metrics。
defer metrics.StartCollectingRuntimeMetrics(
rootScope,
cfg.Metrics.RuntimeMetrics.Enabled,
cfg.Metrics.RuntimeMetrics.CollectInterval)()
// Block the main process.
select {}
}