cmd/aurorabridge/main.go (284 lines of code) (raw):

// Copyright (c) 2019 Uber Technologies, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package main import ( "net/http" "os" "time" "github.com/uber/peloton/.gen/peloton/api/v0/respool" statelesssvc "github.com/uber/peloton/.gen/peloton/api/v1alpha/job/stateless/svc" podsvc "github.com/uber/peloton/.gen/peloton/api/v1alpha/pod/svc" watchsvc "github.com/uber/peloton/.gen/peloton/api/v1alpha/watch/svc" "github.com/uber/peloton/.gen/peloton/private/jobmgrsvc" "github.com/uber/peloton/.gen/thrift/aurora/api/auroraschedulermanagerserver" "github.com/uber/peloton/.gen/thrift/aurora/api/readonlyschedulerserver" "github.com/uber/peloton/pkg/aurorabridge/cache" auth_impl "github.com/uber/peloton/pkg/auth/impl" "github.com/uber/peloton/pkg/aurorabridge" bridgecommon "github.com/uber/peloton/pkg/aurorabridge/common" "github.com/uber/peloton/pkg/auth" "github.com/uber/peloton/pkg/common" "github.com/uber/peloton/pkg/common/buildversion" "github.com/uber/peloton/pkg/common/config" "github.com/uber/peloton/pkg/common/health" "github.com/uber/peloton/pkg/common/leader" "github.com/uber/peloton/pkg/common/logging" "github.com/uber/peloton/pkg/common/metrics" "github.com/uber/peloton/pkg/common/rpc" "github.com/uber/peloton/pkg/hostmgr/mesos/yarpc/peer" "github.com/uber/peloton/pkg/middleware/inbound" "github.com/uber/peloton/pkg/middleware/outbound" log "github.com/sirupsen/logrus" "go.uber.org/yarpc" "go.uber.org/yarpc/api/transport" "go.uber.org/yarpc/transport/grpc" kingpin "gopkg.in/alecthomas/kingpin.v2" ) var ( version string app = kingpin.New("peloton-aurorabridge", "Peloton Aurora bridge") enableSentry = app.Flag( "enable-sentry", "enable logging hook up to sentry"). Default("false"). Envar("ENABLE_SENTRY_LOGGING"). Bool() cfgFiles = app.Flag( "config", "YAML config files (can be provided multiple times to merge configs)"). Short('c'). Required(). ExistingFiles() electionZkServers = app.Flag( "election-zk-server", "Election Zookeeper servers. Specify multiple times for multiple servers "+ "(election.zk_servers override) (set $ELECTION_ZK_SERVERS to override)"). Envar("ELECTION_ZK_SERVERS"). Strings() datacenter = app.Flag( "datacenter", "Datacenter name"). Default(""). Envar("DATACENTER"). String() httpPort = app.Flag( "http-port", "Aurora Bridge HTTP port (aurorabridge.http_port override) "+ "(set $PORT to override)"). Default("5396"). Envar("HTTP_PORT"). Int() grpcPort = app.Flag( "grpc-port", "Aurora Bridge gRPC port (aurorabridge.grpc_port override) "+ "(set $PORT to override)"). Envar("GRPC_PORT"). Int() respoolPath = app.Flag( "respool-path", "Aurora Bridge Resource Pool path"). Envar("RESPOOL_PATH"). String() gpuRespoolPath = app.Flag( "gpu-respool-path", "Aurora Bridge GPU Resource Pool path"). Envar("GPU_RESPOOL_PATH"). String() authType = app.Flag( "auth-type", "Define the auth type used, default to NOOP"). Default("NOOP"). Envar("AUTH_TYPE"). Enum("NOOP", "BASIC") authConfigFile = app.Flag( "auth-config-file", "config file for the auth feature, which is specific to the auth type used"). Default(""). Envar("AUTH_CONFIG_FILE"). String() enableInPlace = app.Flag( "enable-inplace-update", "enable in-place update"). Default("false"). Envar("ENABLE_INPLACE_UPDATE"). Bool() ) func main() { app.Version(version) app.HelpFlag.Short('h') kingpin.MustParse(app.Parse(os.Args[1:])) log.SetFormatter( &logging.LogFieldFormatter{ Formatter: &log.JSONFormatter{}, Fields: log.Fields{ common.AppLogField: app.Name, }, }, ) var cfg Config if err := config.Parse(&cfg, *cfgFiles...); err != nil { log.Fatalf("Error parsing yaml config: %s", err) } if *enableSentry { logging.ConfigureSentry(&cfg.SentryConfig) } if len(*electionZkServers) > 0 { cfg.Election.ZKServers = *electionZkServers } if *httpPort != 0 { cfg.HTTPPort = *httpPort } if *grpcPort != 0 { cfg.GRPCPort = *grpcPort } if len(*respoolPath) > 0 { cfg.RespoolLoader.RespoolPath = *respoolPath } if len(*gpuRespoolPath) > 0 { cfg.RespoolLoader.GPURespoolPath = *gpuRespoolPath } // Parse and setup peloton auth if len(*authType) != 0 { cfg.Auth.AuthType = auth.Type(*authType) cfg.Auth.Path = *authConfigFile } if *enableInPlace { cfg.ServiceHandler.EnableInPlace = true } initialLevel := log.InfoLevel if cfg.Debug { initialLevel = log.DebugLevel } log.SetLevel(initialLevel) log.WithField("config", cfg).Info("Loaded AuroraBridge configuration") rootScope, scopeCloser, mux := metrics.InitMetricScope( &cfg.Metrics, common.PelotonAuroraBridge, metrics.TallyFlushInterval, ) defer scopeCloser.Close() mux.HandleFunc( logging.LevelOverwrite, logging.LevelOverwriteHandler(initialLevel)) mux.HandleFunc(buildversion.Get, buildversion.Handler(version)) // Create both HTTP and GRPC inbounds inbounds := rpc.NewAuroraBridgeInbounds( cfg.HTTPPort, cfg.GRPCPort, // dummy grpc port for aurora bridge mux) discovery, err := leader.NewZkServiceDiscovery( cfg.Election.ZKServers, cfg.Election.Root) if err != nil { log.WithError(err). Fatal("Could not create zk service discovery") } clientRecvOption := grpc.ClientMaxRecvMsgSize(cfg.EventPublisher.GRPCMsgSize) serverRecvOption := grpc.ServerMaxRecvMsgSize(cfg.EventPublisher.GRPCMsgSize) t := grpc.NewTransport( clientRecvOption, serverRecvOption, ) outbounds := yarpc.Outbounds{ common.PelotonJobManager: transport.Outbounds{ Unary: t.NewOutbound( peer.NewPeerChooser(t, 1*time.Second, discovery.GetAppURL, common.JobManagerRole), ), Stream: t.NewOutbound( peer.NewPeerChooser(t, 1*time.Second, discovery.GetAppURL, common.JobManagerRole), ), }, common.PelotonResourceManager: transport.Outbounds{ Unary: t.NewOutbound( peer.NewPeerChooser(t, 1*time.Second, discovery.GetAppURL, common.ResourceManagerRole), ), }, } securityManager, err := auth_impl.CreateNewSecurityManager(&cfg.Auth) if err != nil { log.WithError(err). Fatal("Could not enable security feature") } rateLimitMiddleware, err := inbound.NewRateLimitInboundMiddleware(cfg.RateLimit) if err != nil { log.WithError(err). Fatal("Could not create rate limit middleware") } authInboundMiddleware := inbound.NewAuthInboundMiddleware(securityManager) securityClient, err := auth_impl.CreateNewSecurityClient(&cfg.Auth) if err != nil { log.WithError(err). Fatal("Could not establish secure inter-component communication") } authOutboundMiddleware := outbound.NewAuthOutboundMiddleware(securityClient) dispatcher := yarpc.NewDispatcher(yarpc.Config{ Name: common.PelotonAuroraBridge, Inbounds: inbounds, Outbounds: outbounds, Metrics: yarpc.MetricsConfig{ Tally: rootScope, }, InboundMiddleware: yarpc.InboundMiddleware{ Unary: yarpc.UnaryInboundMiddleware(rateLimitMiddleware, authInboundMiddleware), Stream: yarpc.StreamInboundMiddleware(rateLimitMiddleware, authInboundMiddleware), Oneway: yarpc.OnewayInboundMiddleware(rateLimitMiddleware, authInboundMiddleware), }, OutboundMiddleware: yarpc.OutboundMiddleware{ Unary: authOutboundMiddleware, Stream: authOutboundMiddleware, Oneway: authOutboundMiddleware, }, }) jobClient := statelesssvc.NewJobServiceYARPCClient( dispatcher.ClientConfig(common.PelotonJobManager)) jobmgrClient := jobmgrsvc.NewJobManagerServiceYARPCClient( dispatcher.ClientConfig(common.PelotonJobManager)) podClient := podsvc.NewPodServiceYARPCClient( dispatcher.ClientConfig(common.PelotonJobManager)) respoolClient := respool.NewResourceManagerYARPCClient( dispatcher.ClientConfig(common.PelotonResourceManager)) watchClient := watchsvc.NewWatchServiceYARPCClient( dispatcher.ClientConfig(common.PelotonJobManager)) // Start the dispatcher before we register the aurorabridge handler, since we'll // need to make some outbound requests to get things setup. if err := dispatcher.Start(); err != nil { log.Fatalf("Could not start rpc server: %v", err) } eventPublisher := aurorabridge.NewEventPublisher( cfg.EventPublisher.KafkaURL, jobClient, podClient, watchClient, &http.Client{}, cfg.EventPublisher.PublishEvents, ) server, err := aurorabridge.NewServer( cfg.HTTPPort, cfg.Election, eventPublisher, common.PelotonAuroraBridgeRole, ) if err != nil { log.Fatalf("Unable to create server: %v", err) } candidate, err := leader.NewCandidate( cfg.Election, rootScope, common.PelotonAuroraBridgeRole, server, ) if err != nil { log.Fatalf("Unable to create leader candidate: %v", err) } respoolLoader := aurorabridge.NewRespoolLoader(cfg.RespoolLoader, respoolClient) handler, err := aurorabridge.NewServiceHandler( cfg.ServiceHandler, rootScope, jobClient, jobmgrClient, podClient, respoolLoader, bridgecommon.RandomImpl{}, cache.NewJobIDCache(), ) if err != nil { log.Fatalf("Unable to create service handler: %v", err) } dispatcher.Register(auroraschedulermanagerserver.New(handler)) dispatcher.Register(readonlyschedulerserver.New(handler)) if err := candidate.Start(); err != nil { log.Fatalf("Unable to start leader candidate: %v", err) } defer candidate.Stop() log.WithFields(log.Fields{ "httpPort": cfg.HTTPPort, }).Info("Started Aurora Bridge") // we can *honestly* say the server is booted up now health.InitHeartbeat(rootScope, cfg.Health, candidate) select {} }