commands/multi.go (1,056 lines of code) (raw):

package commands import ( "context" "errors" "fmt" "net" "net/http" "net/http/pprof" "os" "os/signal" "path/filepath" "runtime" "sync" "syscall" "time" "github.com/kardianos/service" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/collectors" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/sirupsen/logrus" "github.com/urfave/cli" "gitlab.com/gitlab-org/gitlab-runner/common" "gitlab.com/gitlab-org/gitlab-runner/helpers" "gitlab.com/gitlab-org/gitlab-runner/helpers/certificate" "gitlab.com/gitlab-org/gitlab-runner/helpers/featureflags" prometheus_helper "gitlab.com/gitlab-org/gitlab-runner/helpers/prometheus" "gitlab.com/gitlab-org/gitlab-runner/helpers/sentry" service_helpers "gitlab.com/gitlab-org/gitlab-runner/helpers/service" "gitlab.com/gitlab-org/gitlab-runner/helpers/usage_log" "gitlab.com/gitlab-org/gitlab-runner/helpers/usage_log/logrotate" "gitlab.com/gitlab-org/gitlab-runner/log" "gitlab.com/gitlab-org/gitlab-runner/network" "gitlab.com/gitlab-org/gitlab-runner/session" ) const ( workerSlotOperationStarted = "started" workerSlotOperationStopped = "stopped" ) const ( workerProcessingFailureOther = "other" workerProcessingFailureNoFreeExecutor = "no_free_executor" workerProcessingFailureJobFailure = "job_failure" ) var ( concurrentDesc = prometheus.NewDesc( "gitlab_runner_concurrent", "The current value of concurrent setting", nil, nil, ) limitDesc = prometheus.NewDesc( "gitlab_runner_limit", "The current value of concurrent setting", []string{"runner", "system_id"}, nil, ) ) type runAtTask interface { cancel() } type runAtTimerTask struct { timer *time.Timer } func (t *runAtTimerTask) cancel() { t.timer.Stop() } func runAt(t time.Time, f func()) runAtTask { timer := time.AfterFunc(time.Until(t), f) task := runAtTimerTask{ timer: timer, } return &task } type RunCommand struct { configOptionsWithListenAddress network common.Network healthHelper healthHelper buildsHelper buildsHelper ServiceName string `short:"n" long:"service" description:"Use different names for different services"` WorkingDirectory string `short:"d" long:"working-directory" description:"Specify custom working directory"` User string `short:"u" long:"user" description:"Use specific user to execute shell scripts"` Syslog bool `long:"syslog" description:"Log to system service logger" env:"LOG_SYSLOG"` // sentry.LogHook is a struct, so accesses are not atomic. Use the sentryLogHookMutex to ensure // mutual exclusion. sentryLogHookMutex sync.Mutex sentryLogHook sentry.LogHook networkMutex sync.Mutex prometheusLogHook prometheus_helper.LogHook failuresCollector *prometheus_helper.FailuresCollector apiRequestsCollector prometheus.Collector sessionServer *session.Server usageLogger *usage_log.Storage // abortBuilds is used to abort running builds abortBuilds chan os.Signal // runInterruptSignal is used to abort current operation (scaling workers, waiting for config) runInterruptSignal chan os.Signal // reloadSignal is used to trigger forceful config reload reloadSignal chan os.Signal // stopSignals is to catch a signals notified to process: SIGTERM, SIGQUIT, Interrupt, Kill stopSignals chan os.Signal // stopSignal is used to preserve the signal that was used to stop the // process In case this is SIGQUIT it makes to finish all builds and session // server. stopSignal os.Signal // configReloaded is used to notify that the config has been reloaded configReloaded chan int // runFinished is used to notify that run() did finish runFinished chan bool currentWorkers int reloadConfigInterval time.Duration runAt func(time.Time, func()) runAtTask runnerWorkerSlots prometheus.Gauge runnerWorkersFeeds *prometheus.CounterVec runnerWorkersFeedFailures *prometheus.CounterVec runnerWorkerSlotOperations *prometheus.CounterVec runnerWorkerProcessingFailure *prometheus.CounterVec } func (mr *RunCommand) log() *logrus.Entry { config := mr.getConfig() concurrent := 0 if config != nil { concurrent = config.Concurrent } return logrus.WithFields(logrus.Fields{ "builds": mr.buildsHelper.buildsCount(), "max_builds": concurrent, }) } // Start is the method implementing `github.com/kardianos/service`.`Interface` // interface. It's responsible for a non-blocking initialization of the process. When it exits, // the main control flow is passed to runWait() configured as service's RunWait method. Take a look // into Execute() for details. func (mr *RunCommand) Start(_ service.Service) error { mr.abortBuilds = make(chan os.Signal) mr.runInterruptSignal = make(chan os.Signal, 1) mr.reloadSignal = make(chan os.Signal, 1) mr.configReloaded = make(chan int, 1) mr.runFinished = make(chan bool, 1) mr.stopSignals = make(chan os.Signal) mr.log().Info("Starting multi-runner from ", mr.ConfigFile, "...") mr.setupInternalMetrics() userModeWarning(false) if mr.WorkingDirectory != "" { err := os.Chdir(mr.WorkingDirectory) if err != nil { return err } } err := mr.reloadConfig() if err != nil { return err } config := mr.getConfig() for _, runner := range config.Runners { mr.runnerWorkersFeeds.WithLabelValues(runner.ShortDescription(), runner.Name, runner.GetSystemID()).Add(0) mr.runnerWorkersFeedFailures. WithLabelValues(runner.ShortDescription(), runner.Name, runner.GetSystemID()). Add(0) mr.runnerWorkerProcessingFailure. WithLabelValues( workerProcessingFailureOther, runner.ShortDescription(), runner.Name, runner.GetSystemID(), ). Add(0) mr.runnerWorkerProcessingFailure. WithLabelValues( workerProcessingFailureNoFreeExecutor, runner.ShortDescription(), runner.Name, runner.GetSystemID(), ). Add(0) mr.runnerWorkerProcessingFailure. WithLabelValues( workerProcessingFailureJobFailure, runner.ShortDescription(), runner.Name, runner.GetSystemID(), ). Add(0) } mr.runnerWorkerSlots.Set(0) mr.runnerWorkerSlotOperations.WithLabelValues(workerSlotOperationStarted).Add(0) mr.runnerWorkerSlotOperations.WithLabelValues(workerSlotOperationStopped).Add(0) // Start should not block. Do the actual work async. go mr.run() return nil } func (mr *RunCommand) setupInternalMetrics() { mr.runnerWorkersFeeds = prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "gitlab_runner_worker_feeds_total", Help: "Total number of times that runner worker is fed to the main loop", }, []string{"runner", "runner_name", "system_id"}, ) mr.runnerWorkersFeedFailures = prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "gitlab_runner_worker_feed_failures_total", Help: "Total number of times that runner worker feeding have failed", }, []string{"runner", "runner_name", "system_id"}, ) mr.runnerWorkerSlots = prometheus.NewGauge(prometheus.GaugeOpts{ Name: "gitlab_runner_worker_slots_number", Help: "Current number of runner worker slots", }) mr.runnerWorkerSlotOperations = prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "gitlab_runner_worker_slot_operations_total", Help: "Total number of runner workers slot operations (starting and stopping slots)", }, []string{"operation"}, ) mr.runnerWorkerProcessingFailure = prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "gitlab_runner_worker_processing_failures_total", Help: "Total number of failures when processing runner worker", }, []string{"failure_type", "runner", "runner_name", "system_id"}, ) } func nextRunnerToReset(config *common.Config) (*common.RunnerConfig, time.Time) { var runnerToReset *common.RunnerConfig var runnerResetTime time.Time for _, runner := range config.Runners { if runner.TokenExpiresAt.IsZero() { continue } expirationInterval := runner.TokenExpiresAt.Sub(runner.TokenObtainedAt) resetTime := runner.TokenObtainedAt.Add( time.Duration(common.TokenResetIntervalFactor * float64(expirationInterval.Nanoseconds())), ) if runnerToReset == nil || resetTime.Before(runnerResetTime) { runnerToReset = runner runnerResetTime = resetTime } } return runnerToReset, runnerResetTime } func (mr *RunCommand) resetRunnerTokens() { for mr.resetOneRunnerToken() { // Handling runner authentication token resetting - one by one - until mr.runFinished // reports that mr.run() have been finished } } func (mr *RunCommand) resetOneRunnerToken() bool { var task runAtTask runnerResetCh := make(chan *common.RunnerConfig) config := mr.getConfig() runnerToReset, runnerResetTime := nextRunnerToReset(config) if runnerToReset != nil { task = mr.runAt(runnerResetTime, func() { runnerResetCh <- runnerToReset }) } select { case runner := <-runnerResetCh: // When the FF is enabled, the token is not reset, however, a message is logged to warn the user // that his token is about to expire if runner.IsFeatureFlagOn(featureflags.DisableAutomaticTokenRotation) { mr.log().Warningln(fmt.Printf( "Automatic token rotation is disabled for runner: %s-%s. Your token is about to expire", runner.ShortDescription(), runner.GetSystemID(), )) return false } if common.ResetToken(mr.network, &runner.RunnerCredentials, runner.GetSystemID(), "") { err := mr.saveConfig() if err != nil { mr.log().WithError(err).Errorln("Failed to save config") } } case <-mr.runFinished: if task != nil { task.cancel() } return false case <-mr.configReloaded: if task != nil { task.cancel() } } return true } func (mr *RunCommand) reloadConfig() error { err := mr.loadConfig() if err != nil { return err } // Set log level err = mr.updateLoggingConfiguration() if err != nil { return err } mr.reloadUsageLogger() // pass user to execute scripts as specific user if mr.User != "" { mr.configMutex.Lock() mr.config.User = mr.User mr.configMutex.Unlock() } config := mr.getConfig() mr.healthHelper.healthy = nil mr.log().Println("Configuration loaded") if c, err := config.Masked(); err == nil { mr.log().Debugln(helpers.ToYAML(c)) } // initialize sentry slh := sentry.LogHook{} if config.SentryDSN != nil { var err error slh, err = sentry.NewLogHook(*config.SentryDSN) if err != nil { mr.log().WithError(err).Errorln("Sentry failure") } } mr.sentryLogHookMutex.Lock() mr.sentryLogHook = slh mr.sentryLogHookMutex.Unlock() if config.ConnectionMaxAge != nil && mr.network != nil { mr.networkMutex.Lock() mr.network.SetConnectionMaxAge(*config.ConnectionMaxAge) mr.networkMutex.Unlock() } mr.configReloaded <- 1 return nil } func (mr *RunCommand) updateLoggingConfiguration() error { reloadNeeded := false config := mr.getConfig() level := "info" if config.LogLevel != nil { level = *config.LogLevel } if !log.Configuration().IsLevelSetWithCli() { err := log.Configuration().SetLevel(level) if err != nil { return err } reloadNeeded = true } format := log.FormatRunner if config.LogFormat != nil { format = *config.LogFormat } if !log.Configuration().IsFormatSetWithCli() { err := log.Configuration().SetFormat(format) if err != nil { return err } reloadNeeded = true } if reloadNeeded { log.Configuration().ReloadConfiguration() } return nil } func (mr *RunCommand) reloadUsageLogger() { if mr.usageLogger != nil { mr.log().Debug("Closing existing usage logger storage") err := mr.usageLogger.Close() if err != nil { mr.log().WithError(err).Error("Failed to close existing usage logger storage") } } if mr.config.Experimental == nil || !mr.config.Experimental.UsageLogger.Enabled { mr.usageLogger = nil mr.log().Info("Usage logger disabled") return } ulConfig := mr.config.Experimental.UsageLogger logDir := ulConfig.LogDir if logDir == "" { logDir = filepath.Join(filepath.Dir(mr.ConfigFile), "usage-log") } options := []logrotate.Option{ logrotate.WithLogDirectory(logDir), } storageOptions := []usage_log.Option{ usage_log.WithLabels(ulConfig.Labels), } logFields := logrus.Fields{ "log_dir": logDir, } if ulConfig.MaxBackupFiles != nil && *ulConfig.MaxBackupFiles > 0 { options = append(options, logrotate.WithMaxBackupFiles(*ulConfig.MaxBackupFiles)) logFields["max_backup_files"] = *ulConfig.MaxBackupFiles } if ulConfig.MaxRotationAge != nil && ulConfig.MaxRotationAge.Nanoseconds() > 0 { options = append(options, logrotate.WithMaxRotationAge(*ulConfig.MaxRotationAge)) logFields["max_rotation_age"] = *ulConfig.MaxRotationAge } mr.log().WithFields(logFields).Info("Usage logger enabled") mr.usageLogger = usage_log.NewStorage(logrotate.New(options...), storageOptions...) } // run is the main method of RunCommand. It's started asynchronously by services support // through `Start` method and is responsible for initializing all goroutines handling // concurrent, multi-runner execution of jobs. // When mr.stopSignal is broadcasted (after `Stop` is called by services support) // this method waits for all workers to be terminated and closes the mr.runFinished // channel, which is the signal that the command was properly terminated (this is the only // valid, properly terminated exit flow for `gitlab-runner run`). func (mr *RunCommand) run() { mr.setupMetricsAndDebugServer() mr.setupSessionServer() go mr.resetRunnerTokens() runners := make(chan *common.RunnerConfig) go mr.feedRunners(runners) mr.initUsedExecutorProviders() signal.Notify(mr.stopSignals, syscall.SIGQUIT, syscall.SIGTERM, os.Interrupt) signal.Notify(mr.reloadSignal, syscall.SIGHUP) startWorker := make(chan int) stopWorker := make(chan bool) go mr.startWorkers(startWorker, stopWorker, runners) workerIndex := 0 // Update number of workers and reload configuration. // Exits when mr.runInterruptSignal receives a signal. for mr.stopSignal == nil { signaled := mr.updateWorkers(&workerIndex, startWorker, stopWorker) if signaled != nil { break } signaled = mr.updateConfig() if signaled != nil { break } } // Wait for workers to shut down mr.stopWorkers(stopWorker) mr.log().Info("All workers stopped.") mr.shutdownUsedExecutorProviders() mr.log().Info("All executor providers shut down.") close(mr.runFinished) mr.log().Info("Can exit now!") } func (mr *RunCommand) initUsedExecutorProviders() { mr.log().Info("Initializing executor providers") for _, provider := range common.GetExecutorProviders() { managedProvider, ok := provider.(common.ManagedExecutorProvider) if ok { managedProvider.Init() } } } func (mr *RunCommand) shutdownUsedExecutorProviders() { shutdownTimeout := mr.config.GetShutdownTimeout() logger := mr.log().WithField("shutdown-timeout", shutdownTimeout) logger.Info("Shutting down executor providers") ctx, cancelFn := context.WithTimeout(context.Background(), shutdownTimeout) defer cancelFn() wg := new(sync.WaitGroup) for _, provider := range common.GetExecutorProviders() { managedProvider, ok := provider.(common.ManagedExecutorProvider) if ok { wg.Add(1) go func(_ common.ManagedExecutorProvider) { defer wg.Done() managedProvider.Shutdown(ctx) }(managedProvider) } } wg.Wait() if ctx.Err() != nil { logger.Warn("Executor providers shutdown timeout exceeded") } } func (mr *RunCommand) setupMetricsAndDebugServer() { listenAddress, err := mr.listenAddress() if err != nil { mr.log().Errorf("invalid listen address: %s", err.Error()) return } if listenAddress == "" { mr.log().Info("listen_address not defined, metrics & debug endpoints disabled") return } // We separate out the listener creation here so that we can return an error if // the provided address is invalid or there is some other listener error. listener, err := net.Listen("tcp", listenAddress) if err != nil { mr.log().WithError(err).Fatal("Failed to create listener for metrics server") } mux := http.NewServeMux() go func() { err := http.Serve(listener, mux) if err != nil { mr.log().WithError(err).Fatal("Metrics server terminated") } }() mr.serveMetrics(mux) mr.serveDebugData(mux) mr.servePprof(mux) mr.log(). WithField("address", listenAddress). Info("Metrics server listening") } func (mr *RunCommand) serveMetrics(mux *http.ServeMux) { registry := prometheus.NewRegistry() // Metrics about the runner's business logic. registry.MustRegister(&mr.buildsHelper) // Metrics about runner workers health registry.MustRegister(&mr.healthHelper) // Metrics about configuration file accessing registry.MustRegister(mr.configAccessCollector) registry.MustRegister(mr) // Metrics about API connections registry.MustRegister(mr.apiRequestsCollector) // Metrics about jobs failures registry.MustRegister(mr.failuresCollector) // Metrics about catched errors registry.MustRegister(&mr.prometheusLogHook) // Metrics about the program's build version. registry.MustRegister(common.AppVersion.NewMetricsCollector()) // Go-specific metrics about the process (GC stats, goroutines, etc.). registry.MustRegister(collectors.NewGoCollector()) // Go-unrelated process metrics (memory usage, file descriptors, etc.). registry.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{})) // Register all executor provider collectors for _, provider := range common.GetExecutorProviders() { if collector, ok := provider.(prometheus.Collector); ok && collector != nil { registry.MustRegister(collector) } } // restrictHTTPMethods should be used on all promhttp handlers // In this specific instance, the handler is uninstrumented, so isn't as // important. But in the future, if any other promhttp handlers are added // they too should be wrapped and restriced. // https://gitlab.com/gitlab-org/gitlab-runner/-/issues/27194 mux.Handle( "/metrics", restrictHTTPMethods( promhttp.HandlerFor(registry, promhttp.HandlerOpts{}), http.MethodGet, http.MethodHead, ), ) } func (mr *RunCommand) serveDebugData(mux *http.ServeMux) { mux.HandleFunc("/debug/jobs/list", mr.buildsHelper.ListJobsHandler) } func (mr *RunCommand) servePprof(mux *http.ServeMux) { mux.HandleFunc("/debug/pprof/", pprof.Index) mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) mux.HandleFunc("/debug/pprof/profile", pprof.Profile) mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol) mux.HandleFunc("/debug/pprof/trace", pprof.Trace) } // restrictHTTPMethods wraps a http.Handler and returns a http.Handler that // restricts methods only to those provided. func restrictHTTPMethods(handler http.Handler, methods ...string) http.Handler { supported := map[string]struct{}{} for _, method := range methods { supported[method] = struct{}{} } return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if _, ok := supported[r.Method]; !ok { http.Error(w, http.StatusText(http.StatusMethodNotAllowed), http.StatusMethodNotAllowed) return } handler.ServeHTTP(w, r) }) } func (mr *RunCommand) setupSessionServer() { config := mr.getConfig() if config.SessionServer.ListenAddress == "" { mr.log().Info("[session_server].listen_address not defined, session endpoints disabled") return } // Create a wrapper function that handles the error from findSessionByURL findSessionWrapper := func(url string) *session.Session { sess, err := mr.buildsHelper.findSessionByURL(url) if err != nil { mr.log().WithError(err).WithField("url", url).Warn("Failed to find session by URL") return nil } return sess } var err error mr.sessionServer, err = session.NewServer( session.ServerConfig{ AdvertiseAddress: config.SessionServer.AdvertiseAddress, ListenAddress: config.SessionServer.ListenAddress, ShutdownTimeout: mr.config.GetShutdownTimeout(), }, mr.log(), certificate.X509Generator{}, findSessionWrapper, ) if err != nil { mr.log().WithError(err).Fatal("Failed to create session server") } go func() { err := mr.sessionServer.Start() if err != nil { mr.log().WithError(err).Fatal("Session server terminated") } }() mr.log(). WithField("address", config.SessionServer.ListenAddress). Info("Session server listening") } // feedRunners works until a stopSignal was saved. // It is responsible for feeding the runners (workers) to channel, which // asynchronously ends with job requests being made and jobs being executed // by concurrent workers. // This is also the place where check interval is calculated and // applied. func (mr *RunCommand) feedRunners(runners chan *common.RunnerConfig) { for mr.stopSignal == nil { mr.log().Debugln("Feeding runners to channel") config := mr.getConfig() // If no runners wait full interval to test again if len(config.Runners) == 0 { time.Sleep(config.GetCheckInterval()) continue } interval := config.GetCheckInterval() / time.Duration(len(config.Runners)) // Feed runner with waiting exact amount of time for _, runner := range config.Runners { mr.feedRunner(runner, runners) time.Sleep(interval) } } mr.log(). WithField("StopSignal", mr.stopSignal). Debug("Stopping feeding runners to channel") } func (mr *RunCommand) feedRunner(runner *common.RunnerConfig, runners chan *common.RunnerConfig) { if !mr.healthHelper.isHealthy(runner) { mr.runnerWorkersFeedFailures.WithLabelValues(runner.ShortDescription(), runner.Name, runner.GetSystemID()).Inc() return } mr.runnerWorkersFeeds.WithLabelValues(runner.ShortDescription(), runner.Name, runner.GetSystemID()).Inc() mr.log().WithField("runner", runner.ShortDescription()).Debugln("Feeding runner to channel") runners <- runner } // startWorkers is responsible for starting the workers (up to the number // defined by `concurrent`) and assigning a runner processing method to them. func (mr *RunCommand) startWorkers(startWorker chan int, stopWorker chan bool, runners chan *common.RunnerConfig) { for mr.stopSignal == nil { id := <-startWorker go mr.processRunners(id, stopWorker, runners) } } // processRunners is responsible for processing a Runner on a worker (when received // a runner information sent to the channel by feedRunners) and for terminating the worker // (when received an information on stoWorker chan - provided by updateWorkers) func (mr *RunCommand) processRunners(id int, stopWorker chan bool, runners chan *common.RunnerConfig) { mr.log(). WithField("worker", id). Debugln("Starting worker") mr.runnerWorkerSlotOperations.WithLabelValues(workerSlotOperationStarted).Inc() for mr.stopSignal == nil { select { case runner := <-runners: err := mr.processRunner(id, runner, runners) if err != nil { logger := mr.log(). WithFields(logrus.Fields{ "runner": runner.ShortDescription(), "executor": runner.Executor, }).WithError(err) l, failureType := loggerAndFailureTypeFromError(logger, err) l("Failed to process runner") mr.runnerWorkerProcessingFailure. WithLabelValues(failureType, runner.ShortDescription(), runner.Name, runner.GetSystemID()). Inc() } case <-stopWorker: mr.log(). WithField("worker", id). Debugln("Stopping worker") mr.runnerWorkerSlotOperations.WithLabelValues(workerSlotOperationStopped).Inc() return } } <-stopWorker } func loggerAndFailureTypeFromError(logger logrus.FieldLogger, err error) (func(args ...interface{}), string) { var NoFreeExecutorError *common.NoFreeExecutorError if errors.As(err, &NoFreeExecutorError) { return logger.Debug, workerProcessingFailureNoFreeExecutor } var BuildError *common.BuildError if errors.As(err, &BuildError) { return logger.Debug, workerProcessingFailureJobFailure } return logger.Warn, workerProcessingFailureOther } // processRunner is responsible for handling one job on a specified runner. // First it acquires the Build to check if `limit` was met. If it's still in the capacity // it creates the debug session (for debug terminal), triggers a job request to configured // GitLab instance and finally creates and finishes the job. // To speed-up jobs handling before starting the job this method "requeues" the runner to another // worker (by feeding the channel normally handled by feedRunners). func (mr *RunCommand) processRunner(id int, runner *common.RunnerConfig, runners chan *common.RunnerConfig) error { mr.log().WithField("runner", runner.ShortDescription()).Debugln("Processing runner") provider := common.GetExecutorProvider(runner.Executor) if provider == nil { mr.log(). WithField("runner", runner.ShortDescription()). Errorf("Executor %q is not known; marking Runner as unhealthy", runner.Executor) mr.healthHelper.markHealth(runner, false) return nil } mr.log().WithField("runner", runner.ShortDescription()).Debug("Acquiring job slot") if !mr.buildsHelper.acquireBuild(runner) { logrus.WithFields(logrus.Fields{ "runner": runner.ShortDescription(), "worker": id, }).Debug("Failed to request job, runner limit met") return nil } defer mr.buildsHelper.releaseBuild(runner) // Acquire request for job // We must ensure that this is released after the job request, or earlier if there's an // error before the job request is made. mr.log().WithField("runner", runner.ShortDescription()).Debug("Acquiring request slot") if !mr.buildsHelper.acquireRequest(runner) { mr.log().WithField("runner", runner.ShortDescription()). Debugln("Failed to request job: 'request_concurrency' already reached, see https://docs.gitlab.com/runner/configuration/advanced-configuration.html#the-runners-section") return nil } mr.log().WithField("runner", runner.ShortDescription()).Debug("Acquiring executor from provider") executorData, err := provider.Acquire(runner) if err != nil { // Release job request mr.buildsHelper.releaseRequest(runner, false) return fmt.Errorf("failed to update executor: %w", err) } defer provider.Release(runner, executorData) return mr.processBuildOnRunner(runner, runners, provider, executorData) } func (mr *RunCommand) processBuildOnRunner( runner *common.RunnerConfig, runners chan *common.RunnerConfig, provider common.ExecutorProvider, executorData common.ExecutorData, ) error { buildSession, sessionInfo, err := mr.createSession(provider) if err != nil { // Release job request mr.buildsHelper.releaseRequest(runner, false) return err } // Receive a new build trace, jobData, err := mr.requestJob(runner, sessionInfo) // Release job request mr.buildsHelper.releaseRequest(runner, jobData != nil) if err != nil || jobData == nil { return err } defer func() { mr.traceOutcome(trace, err) }() // Create a new build build, err := common.NewBuild(*jobData, runner, mr.abortBuilds, executorData) if err != nil { return err } build.Session = buildSession build.ArtifactUploader = mr.network.UploadRawArtifacts trace.SetDebugModeEnabled(build.IsDebugModeEnabled()) // Add build to list of builds to assign numbers mr.buildsHelper.addBuild(build) fields := logrus.Fields{ "job": build.ID, "project": build.JobInfo.ProjectID, "repo_url": build.RepoCleanURL(), "time_in_queue_seconds": build.JobInfo.TimeInQueueSeconds, } mr.log().WithFields(fields).Infoln("Added job to processing list") defer func() { if mr.buildsHelper.removeBuild(build) { mr.log().WithFields(fields).Infoln("Removed job from processing list") mr.usageLoggerStore(usage_log.Record{ Runner: usage_log.Runner{ ID: runner.ShortDescription(), Name: runner.Name, SystemID: runner.GetSystemID(), }, Job: usage_log.Job{ URL: build.JobURL(), DurationSeconds: build.FinalDuration().Seconds(), Status: build.CurrentState().String(), FailureReason: build.FailureReason().String(), StartedAt: build.StartedAt().UTC(), FinishedAt: build.FinishedAt().UTC(), }, }) } }() // Process the same runner by different worker again // to speed up taking the builds mr.requeueRunner(runner, runners) // Process a build return build.Run(mr.getConfig(), trace) } func (mr *RunCommand) traceOutcome(trace common.JobTrace, err error) { if err != nil { fmt.Fprintln(trace, err.Error()) logTerminationError( mr.log(), "Fail", trace.Fail(err, common.JobFailureData{Reason: common.RunnerSystemFailure}), ) return } logTerminationError(mr.log(), "Success", trace.Success()) } func logTerminationError(logger logrus.FieldLogger, name string, err error) { if err != nil { logger.WithError(err).Errorf("Job trace termination %q failed", name) } } func (mr *RunCommand) usageLoggerStore(record usage_log.Record) { if mr.usageLogger == nil { return } l := mr.log().WithField("job_url", record.Job.URL) l.Info("Storing usage log information") err := mr.usageLogger.Store(record) if err != nil { l.WithError(err).Error("Failed to store usage log information") } } // createSession checks if debug server is supported by configured executor and if the // debug server was configured. If both requirements are met, then it creates a debug session // that will be assigned to newly created job. func (mr *RunCommand) createSession(provider common.ExecutorProvider) (*session.Session, *common.SessionInfo, error) { var features common.FeaturesInfo if err := provider.GetFeatures(&features); err != nil { return nil, nil, err } if mr.sessionServer == nil || !features.Session { return nil, nil, nil } sess, err := session.NewSession(mr.log()) if err != nil { return nil, nil, err } sessionInfo := &common.SessionInfo{ URL: mr.sessionServer.AdvertiseAddress + sess.Endpoint, Certificate: string(mr.sessionServer.CertificatePublicKey), Authorization: sess.Token, } return sess, sessionInfo, err } // requestJob will check if the runner can send another concurrent request to // GitLab, if not the return value is nil. func (mr *RunCommand) requestJob( runner *common.RunnerConfig, sessionInfo *common.SessionInfo, ) (common.JobTrace, *common.JobResponse, error) { jobData, healthy := mr.doJobRequest(context.Background(), runner, sessionInfo) mr.healthHelper.markHealth(runner, healthy) if jobData == nil { return nil, nil, nil } // Make sure to always close output jobCredentials := &common.JobCredentials{ ID: jobData.ID, Token: jobData.Token, } trace, err := mr.network.ProcessJob(*runner, jobCredentials) if err != nil { jobInfo := common.UpdateJobInfo{ ID: jobCredentials.ID, State: common.Failed, FailureReason: common.RunnerSystemFailure, } // send failure once mr.network.UpdateJob(*runner, jobCredentials, jobInfo) return nil, nil, err } if err := errors.Join(jobData.UnsupportedOptions(), jobData.ValidateStepsJobRequest(mr.executorSupportsNativeSteps(runner))); err != nil { _, _ = trace.Write([]byte(err.Error() + "\n")) err = trace.Fail(err, common.JobFailureData{ Reason: common.ConfigurationError, ExitCode: common.ExitCodeInvalidConfiguration, }) logTerminationError(mr.log(), "Fail", err) return nil, nil, err } trace.SetFailuresCollector(mr.failuresCollector) return trace, jobData, nil } func (mr *RunCommand) executorSupportsNativeSteps(runnerConfig *common.RunnerConfig) bool { netCli, ok := mr.network.(*network.GitLabClient) return ok && netCli.ExecutorSupportsNativeSteps(*runnerConfig) } // doJobRequest will execute the request for a new job, respecting an interruption // caused by interrupt signals or process execution finalization func (mr *RunCommand) doJobRequest( ctx context.Context, runner *common.RunnerConfig, sessionInfo *common.SessionInfo, ) (*common.JobResponse, bool) { // Terminate opened requests to GitLab when interrupt signal // is broadcast. ctx, cancelFn := context.WithCancel(ctx) defer cancelFn() go func() { select { case <-mr.runInterruptSignal: cancelFn() case <-mr.runFinished: cancelFn() case <-ctx.Done(): } }() return mr.network.RequestJob(ctx, *runner, sessionInfo) } // requeueRunner feeds the runners channel in a non-blocking way. This replicates the // behavior of feedRunners and speeds-up jobs handling. But if the channel is full, the // method just exits without blocking. func (mr *RunCommand) requeueRunner(runner *common.RunnerConfig, runners chan *common.RunnerConfig) { runnerLog := mr.log().WithField("runner", runner.ShortDescription()) select { case runners <- runner: runnerLog.Debugln("Requeued the runner") default: runnerLog.Debugln("Failed to requeue the runner") } } // updateWorkers, called periodically from run() is responsible for scaling the pool // of workers. By worker we don't understand a `[[runners]]` entry, but a "slot" that will // use one of the runners to request and handle a job. // The size of the workers pool is controlled by `concurrent` setting. This method is responsible // for the fact that `concurrent` defines the upper number of jobs that can be concurrently handled // by GitLab Runner process. func (mr *RunCommand) updateWorkers(workerIndex *int, startWorker chan int, stopWorker chan bool) os.Signal { config := mr.getConfig() concurrentLimit := config.Concurrent if concurrentLimit < 1 { mr.log().Fatalln(fmt.Printf( "Current configuration 'concurrent = %d' means that no jobs will be processed, see https://docs.gitlab.com/runner/configuration/advanced-configuration.html#the-global-section", concurrentLimit, )) } for mr.currentWorkers > concurrentLimit { // Too many workers. Trigger stop on one of them // or exit if termination signal was broadcasted. select { case stopWorker <- true: case signaled := <-mr.runInterruptSignal: return signaled } mr.currentWorkers-- mr.runnerWorkerSlots.Set(float64(mr.currentWorkers)) } for mr.currentWorkers < concurrentLimit { // Too few workers. Trigger a creation of a new one // or exit if termination signal was broadcasted. select { case startWorker <- *workerIndex: case signaled := <-mr.runInterruptSignal: return signaled } mr.currentWorkers++ mr.runnerWorkerSlots.Set(float64(mr.currentWorkers)) *workerIndex++ } return nil } func (mr *RunCommand) stopWorkers(stopWorker chan bool) { for mr.currentWorkers > 0 { stopWorker <- true mr.currentWorkers-- mr.runnerWorkerSlots.Set(float64(mr.currentWorkers)) } } func (mr *RunCommand) updateConfig() os.Signal { select { case <-time.After(mr.reloadConfigInterval): err := mr.checkConfig() if err != nil { mr.log().Errorln("Failed to load config", err) } case <-mr.reloadSignal: err := mr.reloadConfig() if err != nil { mr.log().Errorln("Failed to load config", err) } case signaled := <-mr.runInterruptSignal: return signaled } return nil } func (mr *RunCommand) checkConfig() (err error) { info, err := os.Stat(mr.ConfigFile) if err != nil { return err } config := mr.getConfig() if !config.ModTime.Before(info.ModTime()) { return nil } err = mr.reloadConfig() if err != nil { mr.log().Errorln("Failed to load config", err) // don't reload the same file config.ModTime = info.ModTime() return } return nil } // Stop is the method implementing `github.com/kardianos/service`.`Interface` // interface. It's responsible for triggering the process stop. // First it starts a goroutine that starts broadcasting the interrupt signal (used to stop // workers scaling goroutine). // Next it triggers graceful shutdown, which will be handled only if a proper signal is used. // At the end it triggers the forceful shutdown, which handles the forceful the process termination. func (mr *RunCommand) Stop(_ service.Service) error { if mr.stopSignal == nil { mr.stopSignal = os.Interrupt } go mr.interruptRun() defer func() { if mr.sessionServer != nil { mr.sessionServer.Close() } }() // On Windows, we convert SIGTERM and SIGINT signals into a SIGQUIT. // // This enforces *graceful* termination on the first signal received, and a forceful shutdown // on the second. // // This slightly differs from other operating systems. On other systems, receiving a SIGQUIT // works the same way (gracefully) but receiving a SIGTERM and SIGQUIT always results // in an immediate forceful shutdown. // // This handling has to be different as SIGQUIT is not a signal the os/signal package translates // any Windows control concepts to. if runtime.GOOS == "windows" { mr.stopSignal = syscall.SIGQUIT } err := mr.handleGracefulShutdown() if err == nil { return nil } mr.log(). WithError(err). Warning(`Graceful shutdown not finished properly. To gracefully clean up running plugins please use SIGQUIT (ctrl-\) instead of SIGINT (ctrl-c)`) err = mr.handleForcefulShutdown() if err == nil { return nil } mr.log(). WithError(err). Warning("Forceful shutdown not finished properly") mr.usageLoggerClose() return err } // interruptRun broadcasts interrupt signal, which exits the workers // scaling goroutine. func (mr *RunCommand) interruptRun() { mr.log().Debug("Broadcasting interrupt signal") // Pump interrupt signal for { mr.runInterruptSignal <- mr.stopSignal } } // handleGracefulShutdown is responsible for handling the "graceful" strategy of exiting. // It's executed only when specific signal is used to terminate the process. // At this moment feedRunners() should exit and workers scaling is being terminated. // This means that new jobs will be not requested. handleGracefulShutdown() will ensure that // the process will not exit until `mr.runFinished` is closed, so all jobs were finished and // all workers terminated. It may however exit if another signal - other than the gracefulShutdown // signal - is received. func (mr *RunCommand) handleGracefulShutdown() error { // We wait till we have a SIGQUIT for mr.stopSignal == syscall.SIGQUIT { mr.log(). WithField("StopSignal", mr.stopSignal). Warning("Starting graceful shutdown, waiting for builds to finish") // Wait for other signals to finish builds select { case mr.stopSignal = <-mr.stopSignals: // We received a new signal mr.log().WithField("stop-signal", mr.stopSignal).Warning("[handleGracefulShutdown] received stop signal") case <-mr.runFinished: // Everything finished we can exit now return nil } } return fmt.Errorf("received stop signal: %v", mr.stopSignal) } // handleForcefulShutdown is executed if handleGracefulShutdown exited with an error // (which means that a signal forcing shutdown was used instead of the signal // specific for graceful shutdown). // It calls mr.abortAllBuilds which will broadcast abort signal which finally // ends with jobs termination. // Next it waits for one of the following events: // 1. Another signal was sent to process, which is handled as force exit and // triggers exit of the method and finally process termination without // waiting for anything else. // 2. ShutdownTimeout is exceeded. If waiting for shutdown will take more than // defined time, the process will be forceful terminated just like in the // case when second signal is sent. // 3. mr.runFinished was closed, which means that all termination was done // properly. // // After this method exits, Stop returns it error and finally the // `github.com/kardianos/service` service mechanism will finish // process execution. func (mr *RunCommand) handleForcefulShutdown() error { mr.log(). WithField("shutdown-timeout", mr.config.GetShutdownTimeout()). WithField("StopSignal", mr.stopSignal). Warning("Starting forceful shutdown") go mr.abortAllBuilds() // Wait for graceful shutdown or abort after timeout for { select { case mr.stopSignal = <-mr.stopSignals: mr.log().WithField("stop-signal", mr.stopSignal).Warning("[handleForcefulShutdown] received stop signal") return fmt.Errorf("forced exit with stop signal: %v", mr.stopSignal) case <-time.After(mr.config.GetShutdownTimeout()): return errors.New("shutdown timed out") case <-mr.runFinished: // Everything finished we can exit now return nil } } } // abortAllBuilds broadcasts abort signal, which ends with all currently executed // jobs being interrupted and terminated. func (mr *RunCommand) abortAllBuilds() { mr.log().Debug("Broadcasting job abort signal") // Pump signal to abort all current builds for { mr.abortBuilds <- mr.stopSignal } } func (mr *RunCommand) usageLoggerClose() { if mr.usageLogger != nil { err := mr.usageLogger.Close() mr.usageLogger = nil mr.log().WithError(err).Error("Closing usage logger") } } func (mr *RunCommand) Execute(_ *cli.Context) { svcConfig := &service.Config{ Name: mr.ServiceName, DisplayName: mr.ServiceName, Description: defaultDescription, Arguments: []string{"run"}, Option: service.KeyValue{ "RunWait": mr.runWait, }, } svc, err := service_helpers.New(mr, svcConfig) if err != nil { logrus.WithError(err). Fatalln("Service creation failed") } if mr.Syslog { log.SetSystemLogger(logrus.StandardLogger(), svc) } mr.sentryLogHookMutex.Lock() logrus.AddHook(&mr.sentryLogHook) mr.sentryLogHookMutex.Unlock() logrus.AddHook(&mr.prometheusLogHook) err = svc.Run() if err != nil { logrus.WithError(err). Fatal("Service run failed") } } // runWait is the blocking mechanism for `github.com/kardianos/service` // service. It's started after Start exited and should block the control flow. When it exits, // then the Stop is executed and service shutdown should be handled. // For Runner it waits for the stopSignal to be received by the process. When it will happen, // it's saved in mr.stopSignal and runWait() exits, triggering the shutdown handling. func (mr *RunCommand) runWait() { mr.log().Debugln("Waiting for stop signal") // Save the stop signal and exit to execute Stop() stopSignal := <-mr.stopSignals mr.stopSignal = stopSignal mr.log().WithField("stop-signal", stopSignal).Warning("[runWait] received stop signal") } // Describe implements prometheus.Collector. func (mr *RunCommand) Describe(ch chan<- *prometheus.Desc) { ch <- concurrentDesc ch <- limitDesc mr.runnerWorkersFeeds.Describe(ch) mr.runnerWorkersFeedFailures.Describe(ch) mr.runnerWorkerSlots.Describe(ch) mr.runnerWorkerSlotOperations.Describe(ch) mr.runnerWorkerProcessingFailure.Describe(ch) } // Collect implements prometheus.Collector. func (mr *RunCommand) Collect(ch chan<- prometheus.Metric) { config := mr.getConfig() ch <- prometheus.MustNewConstMetric( concurrentDesc, prometheus.GaugeValue, float64(config.Concurrent), ) for _, runner := range config.Runners { ch <- prometheus.MustNewConstMetric( limitDesc, prometheus.GaugeValue, float64(runner.Limit), runner.ShortDescription(), runner.SystemIDState.GetSystemID(), ) } mr.runnerWorkersFeeds.Collect(ch) mr.runnerWorkersFeedFailures.Collect(ch) mr.runnerWorkerSlots.Collect(ch) mr.runnerWorkerSlotOperations.Collect(ch) mr.runnerWorkerProcessingFailure.Collect(ch) } func init() { apiRequestsCollector := network.NewAPIRequestsCollector() cmd := &RunCommand{ ServiceName: defaultServiceName, network: network.NewGitLabClientWithAPIRequestsCollector(apiRequestsCollector), apiRequestsCollector: apiRequestsCollector, prometheusLogHook: prometheus_helper.NewLogHook(), failuresCollector: prometheus_helper.NewFailuresCollector(), healthHelper: newHealthHelper(), buildsHelper: newBuildsHelper(), runAt: runAt, reloadConfigInterval: common.ReloadConfigInterval, } cmd.configAccessCollector = newConfigAccessCollector() common.RegisterCommand2("run", "run multi runner service", cmd) }