commands/multi.go (402 lines of code) (raw):

package commands import ( "errors" "fmt" "net" "net/http" _ "net/http/pprof" // PPROF package adds everything itself inside its init() function "os" "os/signal" "runtime" "syscall" "time" service "github.com/ayufan/golang-kardianos-service" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/urfave/cli" log "github.com/Sirupsen/logrus" "gitlab.com/gitlab-org/gitlab-ci-multi-runner/common" "gitlab.com/gitlab-org/gitlab-ci-multi-runner/helpers" "gitlab.com/gitlab-org/gitlab-ci-multi-runner/helpers/cli" prometheus_helper "gitlab.com/gitlab-org/gitlab-ci-multi-runner/helpers/prometheus" "gitlab.com/gitlab-org/gitlab-ci-multi-runner/helpers/sentry" "gitlab.com/gitlab-org/gitlab-ci-multi-runner/helpers/service" "gitlab.com/gitlab-org/gitlab-ci-multi-runner/network" ) type RunCommand struct { configOptionsWithMetricsServer network common.Network healthHelper buildsHelper buildsHelper ServiceName string `short:"n" long:"service" description:"Use different names for different services"` WorkingDirectory string `short:"d" long:"working-directory" description:"Specify custom working directory"` User string `short:"u" long:"user" description:"Use specific user to execute shell scripts"` Syslog bool `long:"syslog" description:"Log to syslog"` sentryLogHook sentry.LogHook prometheusLogHook prometheus_helper.LogHook // abortBuilds is used to abort running builds abortBuilds chan os.Signal // runSignal is used to abort current operation (scaling workers, waiting for config) runSignal chan os.Signal // reloadSignal is used to trigger forceful config reload reloadSignal chan os.Signal // stopSignals is to catch a signals notified to process: SIGTERM, SIGQUIT, Interrupt, Kill stopSignals chan os.Signal // stopSignal is used to preserve the signal that was used to stop the process // In case this is SIGQUIT it makes to finish all buids stopSignal os.Signal // runFinished is used to notify that Run() did finish runFinished chan bool currentWorkers int } func (mr *RunCommand) log() *log.Entry { return log.WithField("builds", mr.buildsHelper.buildsCount()) } func (mr *RunCommand) feedRunner(runner *common.RunnerConfig, runners chan *common.RunnerConfig) { if !mr.isHealthy(runner.UniqueID()) { return } runners <- runner } func (mr *RunCommand) feedRunners(runners chan *common.RunnerConfig) { for mr.stopSignal == nil { mr.log().Debugln("Feeding runners to channel") config := mr.config // If no runners wait full interval to test again if len(config.Runners) == 0 { time.Sleep(config.GetCheckInterval()) continue } interval := config.GetCheckInterval() / time.Duration(len(config.Runners)) // Feed runner with waiting exact amount of time for _, runner := range config.Runners { mr.feedRunner(runner, runners) time.Sleep(interval) } } } func (mr *RunCommand) requestJob(runner *common.RunnerConfig) (*common.JobResponse, bool) { if !mr.buildsHelper.acquireRequest(runner) { return nil, false } defer mr.buildsHelper.releaseRequest(runner) jobData, healthy := mr.network.RequestJob(*runner) mr.makeHealthy(runner.UniqueID(), healthy) return jobData, true } func (mr *RunCommand) processRunner(id int, runner *common.RunnerConfig, runners chan *common.RunnerConfig) (err error) { provider := common.GetExecutor(runner.Executor) if provider == nil { return } context, err := provider.Acquire(runner) if err != nil { log.Warningln("Failed to update executor", runner.Executor, "for", runner.ShortDescription(), err) return } defer provider.Release(runner, context) // Acquire build slot if !mr.buildsHelper.acquireBuild(runner) { mr.log().WithField("runner", runner.ShortDescription()). Debugln("Failed to request job: runner limit meet") return } defer mr.buildsHelper.releaseBuild(runner) // Receive a new build jobData, result := mr.requestJob(runner) if !result { mr.log().WithField("runner", runner.ShortDescription()). Debugln("Failed to request job: runner requestConcurrency meet") return } if jobData == nil { return } // Make sure to always close output jobCredentials := &common.JobCredentials{ ID: jobData.ID, Token: jobData.Token, } trace := mr.network.ProcessJob(*runner, jobCredentials) defer trace.Fail(err) // Create a new build build := &common.Build{ JobResponse: *jobData, Runner: runner, ExecutorData: context, SystemInterrupt: mr.abortBuilds, } // Add build to list of builds to assign numbers mr.buildsHelper.addBuild(build) defer mr.buildsHelper.removeBuild(build) // Process the same runner by different worker again // to speed up taking the builds select { case runners <- runner: mr.log().WithField("runner", runner.ShortDescription()).Debugln("Requeued the runner") default: mr.log().WithField("runner", runner.ShortDescription()).Debugln("Failed to requeue the runner: ") } // Process a build return build.Run(mr.config, trace) } func (mr *RunCommand) processRunners(id int, stopWorker chan bool, runners chan *common.RunnerConfig) { mr.log().WithField("worker", id).Debugln("Starting worker") for mr.stopSignal == nil { select { case runner := <-runners: mr.processRunner(id, runner, runners) // force GC cycle after processing build runtime.GC() case <-stopWorker: mr.log().WithField("worker", id).Debugln("Stopping worker") return } } <-stopWorker } func (mr *RunCommand) startWorkers(startWorker chan int, stopWorker chan bool, runners chan *common.RunnerConfig) { for mr.stopSignal == nil { id := <-startWorker go mr.processRunners(id, stopWorker, runners) } } func (mr *RunCommand) loadConfig() error { err := mr.configOptions.loadConfig() if err != nil { return err } // Set log level if !cli_helpers.CustomLogLevelSet && mr.config.LogLevel != nil { level, err := log.ParseLevel(*mr.config.LogLevel) if err != nil { log.Fatalf(err.Error()) } log.SetLevel(level) } // pass user to execute scripts as specific user if mr.User != "" { mr.config.User = mr.User } mr.healthy = nil mr.log().Println("Configuration loaded") mr.log().Debugln(helpers.ToYAML(mr.config)) // initialize sentry if mr.config.SentryDSN != nil { var err error mr.sentryLogHook, err = sentry.NewLogHook(*mr.config.SentryDSN) if err != nil { mr.log().WithError(err).Errorln("Sentry failure") } } else { mr.sentryLogHook = sentry.LogHook{} } return nil } func (mr *RunCommand) checkConfig() (err error) { info, err := os.Stat(mr.ConfigFile) if err != nil { return err } if !mr.config.ModTime.Before(info.ModTime()) { return nil } err = mr.loadConfig() if err != nil { mr.log().Errorln("Failed to load config", err) // don't reload the same file mr.config.ModTime = info.ModTime() return } return nil } func (mr *RunCommand) Start(s service.Service) error { mr.abortBuilds = make(chan os.Signal) mr.runSignal = make(chan os.Signal, 1) mr.reloadSignal = make(chan os.Signal, 1) mr.runFinished = make(chan bool, 1) mr.stopSignals = make(chan os.Signal) mr.log().Println("Starting multi-runner from", mr.ConfigFile, "...") userModeWarning(false) if len(mr.WorkingDirectory) > 0 { err := os.Chdir(mr.WorkingDirectory) if err != nil { return err } } err := mr.loadConfig() if err != nil { return err } // Start should not block. Do the actual work async. go mr.Run() return nil } func (mr *RunCommand) updateWorkers(workerIndex *int, startWorker chan int, stopWorker chan bool) os.Signal { buildLimit := mr.config.Concurrent if buildLimit < 1 { mr.log().Fatalln("Concurrent is less than 1 - no jobs will be processed") } for mr.currentWorkers > buildLimit { select { case stopWorker <- true: case signaled := <-mr.runSignal: return signaled } mr.currentWorkers-- } for mr.currentWorkers < buildLimit { select { case startWorker <- *workerIndex: case signaled := <-mr.runSignal: return signaled } mr.currentWorkers++ *workerIndex++ } return nil } func (mr *RunCommand) updateConfig() os.Signal { select { case <-time.After(common.ReloadConfigInterval * time.Second): err := mr.checkConfig() if err != nil { mr.log().Errorln("Failed to load config", err) } case <-mr.reloadSignal: err := mr.loadConfig() if err != nil { mr.log().Errorln("Failed to load config", err) } case signaled := <-mr.runSignal: return signaled } return nil } func (mr *RunCommand) runWait() { mr.log().Debugln("Waiting for stop signal") // Save the stop signal and exit to execute Stop() mr.stopSignal = <-mr.stopSignals } func (mr *RunCommand) serveMetrics() { registry := prometheus.NewRegistry() // Metrics about the runner's business logic. registry.MustRegister(&mr.buildsHelper) // Metrics about catched errors registry.MustRegister(&mr.prometheusLogHook) // Metrics about the program's build version. registry.MustRegister(common.AppVersion.NewMetricsCollector()) // Go-specific metrics about the process (GC stats, goroutines, etc.). registry.MustRegister(prometheus.NewGoCollector()) // Go-unrelated process metrics (memory usage, file descriptors, etc.). registry.MustRegister(prometheus.NewProcessCollector(os.Getpid(), "")) // Register all executor provider collectors for _, provider := range common.GetExecutorProviders() { if collector, ok := provider.(prometheus.Collector); ok && collector != nil { registry.MustRegister(collector) } } http.Handle("/metrics", promhttp.HandlerFor(registry, promhttp.HandlerOpts{})) } func (mr *RunCommand) serveDebugData() { http.Handle("/debug/jobs/list", http.HandlerFunc(mr.buildsHelper.ListJobsHandler)) } func (mr *RunCommand) setupMetricsAndDebugServer() { serverAddress, err := mr.metricsServerAddress() if err != nil { mr.log().Errorf("invalid metrics server address: %s", err.Error()) return } if serverAddress == "" { log.Infoln("Metrics server disabled") return } // We separate out the listener creation here so that we can return an error if // the provided address is invalid or there is some other listener error. listener, err := net.Listen("tcp", serverAddress) if err != nil { log.Fatalln(err) } go func() { log.Fatalln(http.Serve(listener, nil)) }() mr.serveMetrics() mr.serveDebugData() log.Infoln("Metrics server listening at", serverAddress) } func (mr *RunCommand) Run() { mr.setupMetricsAndDebugServer() runners := make(chan *common.RunnerConfig) go mr.feedRunners(runners) signal.Notify(mr.stopSignals, syscall.SIGQUIT, syscall.SIGTERM, os.Interrupt, os.Kill) signal.Notify(mr.reloadSignal, syscall.SIGHUP) startWorker := make(chan int) stopWorker := make(chan bool) go mr.startWorkers(startWorker, stopWorker, runners) workerIndex := 0 for mr.stopSignal == nil { signaled := mr.updateWorkers(&workerIndex, startWorker, stopWorker) if signaled != nil { break } signaled = mr.updateConfig() if signaled != nil { break } } // Wait for workers to shutdown for mr.currentWorkers > 0 { stopWorker <- true mr.currentWorkers-- } mr.log().Println("All workers stopped. Can exit now") mr.runFinished <- true } func (mr *RunCommand) interruptRun() { // Pump interrupt signal for { mr.runSignal <- mr.stopSignal } } func (mr *RunCommand) abortAllBuilds() { // Pump signal to abort all current builds for { mr.abortBuilds <- mr.stopSignal } } func (mr *RunCommand) handleGracefulShutdown() error { // We wait till we have a SIGQUIT for mr.stopSignal == syscall.SIGQUIT { mr.log().Warningln("Requested quit, waiting for builds to finish") // Wait for other signals to finish builds select { case mr.stopSignal = <-mr.stopSignals: // We received a new signal case <-mr.runFinished: // Everything finished we can exit now return nil } } return fmt.Errorf("received: %v", mr.stopSignal) } func (mr *RunCommand) handleShutdown() error { mr.log().Warningln("Requested service stop:", mr.stopSignal) go mr.abortAllBuilds() // Wait for graceful shutdown or abort after timeout for { select { case mr.stopSignal = <-mr.stopSignals: return fmt.Errorf("forced exit: %v", mr.stopSignal) case <-time.After(common.ShutdownTimeout * time.Second): return errors.New("shutdown timedout") case <-mr.runFinished: // Everything finished we can exit now return nil } } } func (mr *RunCommand) Stop(s service.Service) (err error) { go mr.interruptRun() err = mr.handleGracefulShutdown() if err == nil { return } err = mr.handleShutdown() return } func (mr *RunCommand) Execute(context *cli.Context) { svcConfig := &service.Config{ Name: mr.ServiceName, DisplayName: mr.ServiceName, Description: defaultDescription, Arguments: []string{"run"}, Option: service.KeyValue{ "RunWait": mr.runWait, }, } service, err := service_helpers.New(mr, svcConfig) if err != nil { log.Fatalln(err) } if mr.Syslog { log.SetFormatter(new(log.TextFormatter)) logger, err := service.SystemLogger(nil) if err == nil { log.AddHook(&ServiceLogHook{logger, log.InfoLevel}) } else { log.Errorln(err) } } log.AddHook(&mr.sentryLogHook) log.AddHook(&mr.prometheusLogHook) err = service.Run() if err != nil { log.Fatalln(err) } } func init() { common.RegisterCommand2("run", "run multi runner service", &RunCommand{ ServiceName: defaultServiceName, network: network.NewGitLabClient(), prometheusLogHook: prometheus_helper.NewLogHook(), }) }