executors/docker/machine/provider.go (545 lines of code) (raw):

package machine import ( "errors" "fmt" "sync" "time" "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" "gitlab.com/gitlab-org/gitlab-runner/common" "gitlab.com/gitlab-org/gitlab-runner/helpers/docker" ) type machineProvider struct { name string machine docker.Machine details machinesDetails runners runnersDetails lock sync.RWMutex acquireLock sync.Mutex // provider stores a real executor that is used to start run the builds provider common.ExecutorProvider stuckRemoveLock sync.Mutex // metrics totalActions *prometheus.CounterVec currentStatesDesc *prometheus.Desc creationHistogram prometheus.Histogram stoppingHistogram prometheus.Histogram removalHistogram prometheus.Histogram failedCreationHistogram prometheus.Histogram } func (m *machineProvider) machineDetails(name string, acquire bool) *machineDetails { details := m.ensureDetails(name) if acquire { details = m.tryAcquireMachineDetails(details) } return details } func (m *machineProvider) ensureDetails(name string) *machineDetails { m.lock.Lock() defer m.lock.Unlock() details, ok := m.details[name] if !ok { now := time.Now() details = &machineDetails{ Name: name, Created: now, Used: now, LastSeen: now, UsedCount: 1, // any machine that we find we mark as already used State: machineStateIdle, } m.details[name] = details } return details } var errNoConfig = errors.New("no runner config specified") func (m *machineProvider) runnerMachinesCoordinator(config *common.RunnerConfig) (*runnerMachinesCoordinator, error) { if config == nil { return nil, errNoConfig } m.lock.Lock() defer m.lock.Unlock() details, ok := m.runners[config.GetToken()] if !ok { details = newRunnerMachinesCoordinator() m.runners[config.GetToken()] = details } return details, nil } func (m *machineProvider) create(config *common.RunnerConfig, state machineState) (*machineDetails, chan error) { name := newMachineName(config) details := m.machineDetails(name, true) details.Lock() details.State = machineStateCreating details.UsedCount = 0 details.RetryCount = 0 details.LastSeen = time.Now() details.Unlock() errCh := make(chan error, 1) // Create machine with the required configuration asynchronously coordinator, err := m.runnerMachinesCoordinator(config) if err != nil { errCh <- err return nil, errCh } go coordinator.waitForGrowthCapacity(config.Machine.MaxGrowthRate, func() { m.createWithGrowthCapacity(coordinator, config, details, state, errCh) }) return details, errCh } func (m *machineProvider) createWithGrowthCapacity( coordinator *runnerMachinesCoordinator, config *common.RunnerConfig, details *machineDetails, state machineState, errCh chan error, ) { logger := logrus.WithField("name", details.Name) started := time.Now() err := m.machine.Create(config.Machine.MachineDriver, details.Name, config.Machine.MachineOptions...) if err != nil { logger.WithField("time", time.Since(started)). WithError(err). Errorln("Machine creation failed") m.totalActions.WithLabelValues("creation-failed").Inc() m.failedCreationHistogram.Observe(time.Since(started).Seconds()) _ = m.remove(details.Name, "Failed to create") } else { details.Lock() details.State = state details.Used = time.Now() retryCount := details.RetryCount details.Unlock() creationTime := time.Since(started) logger.WithField("duration", creationTime). WithField("now", time.Now()). WithField("retries", retryCount). Infoln("Machine created") m.totalActions.WithLabelValues("created").Inc() m.creationHistogram.Observe(creationTime.Seconds()) // Signal that a new machine is available. When there's contention, there's no guarantee between the // ordering of reading from errCh and the availability check. coordinator.addAvailableMachine() } errCh <- err } func (m *machineProvider) findFreeMachine(skipCache bool, machines ...string) (details *machineDetails) { // Enumerate all machines in reverse order, to always take the newest machines first for idx := range machines { name := machines[len(machines)-idx-1] details := m.machineDetails(name, true) if details == nil { continue } // Check if node is running canConnect := m.machine.CanConnect(name, skipCache) if !canConnect { _ = m.remove(name, "machine is unavailable") continue } return details } return nil } func (m *machineProvider) findFreeExistingMachine(config *common.RunnerConfig) (*machineDetails, error) { machines, err := m.loadMachines(config) if err != nil { return nil, err } return m.findFreeMachine(true, machines...), nil } func (m *machineProvider) useMachine(config *common.RunnerConfig) (*machineDetails, error) { details, err := m.findFreeExistingMachine(config) if err != nil || details != nil { return details, err } return m.createAndAcquireMachine(config) } func (m *machineProvider) createAndAcquireMachine(config *common.RunnerConfig) (*machineDetails, error) { coordinator, err := m.runnerMachinesCoordinator(config) if err != nil { return nil, err } newDetails, errCh := m.create(config, machineStateIdle) // Use either a free machine, or the created machine; whichever comes first. There's no guarantee that the created // machine can be used by us because between the time the machine is created, and the acquisition of the machine, // another goroutine may have found it via findFreeMachine and acquired it. var details *machineDetails for details == nil && err == nil { select { case err = <-errCh: if err != nil { return nil, err } details = m.tryAcquireMachineDetails(newDetails) case <-coordinator.availableMachineSignal(): // Even though the signal is fired and we are *almost* sure that // there's a machine available, let's use the getAvailableMachine // method so that the internal counter is synchonized with what // we are actually doing and so that we can be sure that no other // goroutine that didn't accept the signal and instead used the ticker // hasn't already snatched a machine details, err = m.tryGetFreeExistingMachineFromCoordinator(config, coordinator) case <-time.After(time.Second): details, err = m.tryGetFreeExistingMachineFromCoordinator(config, coordinator) } } return details, err } func (m *machineProvider) tryGetFreeExistingMachineFromCoordinator( config *common.RunnerConfig, coordinator *runnerMachinesCoordinator, ) (*machineDetails, error) { if coordinator.getAvailableMachine() { return m.findFreeExistingMachine(config) } return nil, nil } func (m *machineProvider) tryAcquireMachineDetails(details *machineDetails) *machineDetails { details.Lock() defer details.Unlock() if details.isUsed() { return nil } details.State = machineStateAcquired return details } func (m *machineProvider) retryUseMachine(config *common.RunnerConfig) (details *machineDetails, err error) { // Try to find a machine for i := 0; i < 3; i++ { details, err = m.useMachine(config) if err == nil { break } time.Sleep(provisionRetryInterval) } return } func (m *machineProvider) removeMachine(details *machineDetails) (err error) { details.Lock() logger := details.logger() info := details.info() details.Unlock() if !m.machine.Exist(details.Name) { logger.Warningln("Skipping machine removal, because it doesn't exist") return nil } // This code limits amount of removal of stuck machines to one machine per interval if info.isStuckOnRemove() { m.stuckRemoveLock.Lock() defer m.stuckRemoveLock.Unlock() } logger.Warningln("Stopping machine") err = runHistogramCountedOperation(m.stoppingHistogram, func() error { return m.machine.Stop(details.Name, machineStopCommandTimeout) }) if err != nil { logger. WithError(err). Warningln("Error while stopping machine") } logger.Warningln("Removing machine") err = runHistogramCountedOperation(m.removalHistogram, func() error { return m.machine.Remove(details.Name) }) if err != nil { details.Lock() details.RetryCount++ details.Unlock() time.Sleep(removeRetryInterval) return err } return nil } func runHistogramCountedOperation(histogram prometheus.Histogram, operation func() error) error { startedAt := time.Now() err := operation() histogram.Observe(time.Since(startedAt).Seconds()) return err } func (m *machineProvider) finalizeRemoval(details *machineDetails) { for { err := m.removeMachine(details) if err == nil { break } } m.lock.Lock() defer m.lock.Unlock() delete(m.details, details.Name) details.Lock() retryCount := details.RetryCount details.Unlock() details.logger(). WithField("now", time.Now()). WithField("retries", retryCount). Infoln("Machine removed") m.totalActions.WithLabelValues("removed").Inc() } func (m *machineProvider) remove(machineName string, reason ...interface{}) error { m.lock.Lock() defer m.lock.Unlock() details := m.details[machineName] if details == nil { return errors.New("machine not found") } now := time.Now() details.Lock() details.Reason = fmt.Sprint(reason...) details.State = machineStateRemoving details.RetryCount = 0 details.logger(). WithField("now", now). Warningln("Requesting machine removal") details.Used = now details.writeDebugInformation() details.Unlock() go m.finalizeRemoval(details) return nil } func (m *machineProvider) updateMachines( machines []string, config *common.RunnerConfig, ) (data machinesData, validMachines []string) { data.Runner = config.ShortDescription() validMachines = make([]string, 0, len(machines)) for _, name := range machines { details := m.machineDetails(name, false) details.Lock() details.LastSeen = time.Now() info := details.info() details.Unlock() reason := shouldRemoveIdle(config, &data, info) if reason == dontRemoveIdleMachine { validMachines = append(validMachines, name) } else { _ = m.remove(details.Name, reason) } // remove() above can mutate details, so we re-create info: details.Lock() info = details.info() details.Unlock() data.Add(info) } return } // createMachines starts goroutines that are creating the new machines. // Limiting strategy is used to ensure the autoscaling parameters are respected. func (m *machineProvider) createMachines(config *common.RunnerConfig, data *machinesData) { for { if !canCreateIdle(config, data) { return } // Create a new machine and mark it as Idle m.create(config, machineStateIdle) data.Creating++ } } // intermediateMachineList returns a list of machines that might not yet be // persisted on disk, these machines are the ones between being virtually // created, and `docker-machine create` getting executed we populate this data // set to overcome the race conditions related to not-full set of machines // returned by `docker-machine ls -q` func (m *machineProvider) intermediateMachineList(excludedMachines []string) []string { var excludedSet map[string]struct{} var intermediateMachines []string m.lock.Lock() defer m.lock.Unlock() for _, details := range m.details { details.Lock() persisted := details.isPersistedOnDisk() details.Unlock() if persisted { continue } // lazy init set, as most of times we don't create new machines if excludedSet == nil { excludedSet = make(map[string]struct{}, len(excludedMachines)) for _, excludedMachine := range excludedMachines { excludedSet[excludedMachine] = struct{}{} } } if _, ok := excludedSet[details.Name]; ok { continue } intermediateMachines = append(intermediateMachines, details.Name) } return intermediateMachines } func (m *machineProvider) loadMachines(config *common.RunnerConfig) (machines []string, err error) { machines, err = m.machine.List() if err != nil { return nil, err } machines = append(machines, m.intermediateMachineList(machines)...) machines = filterMachineList(machines, machineFilter(config)) return } func (m *machineProvider) Acquire(config *common.RunnerConfig) (common.ExecutorData, error) { if config.Machine == nil || config.Machine.MachineName == "" { return nil, fmt.Errorf("missing Machine options") } // Lock updating machines, because two Acquires can be run at the same time m.acquireLock.Lock() defer m.acquireLock.Unlock() machines, err := m.loadMachines(config) if err != nil { return nil, err } // Update a list of currently configured machines machinesData, validMachines := m.updateMachines(machines, config) // Pre-create machines m.createMachines(config, &machinesData) logger := logrus.WithFields(machinesData.Fields()). WithField("runner", config.ShortDescription()). WithField("idleCountMin", config.Machine.GetIdleCountMin()). WithField("idleCount", config.Machine.GetIdleCount()). WithField("idleScaleFactor", config.Machine.GetIdleScaleFactor()). WithField("maxMachines", config.Limit). WithField("maxMachineCreate", config.Machine.MaxGrowthRate) logger.WithField("time", time.Now()).Debugln("Docker Machine Details") machinesData.writeDebugInformation() // Try to find a free machine details := m.findFreeMachine(false, validMachines...) if details != nil { return details, nil } if config.Machine.GetIdleCount() == 0 && canCreateOnDemand(config, &machinesData) { logger.Debug("IdleCount is set to 0 so the machine will be created on demand in job context") } else if machinesData.Idle == 0 { return nil, &common.NoFreeExecutorError{Message: "no free machines that can process builds"} } return nil, nil } //nolint:nakedret func (m *machineProvider) Use( config *common.RunnerConfig, data common.ExecutorData, ) (newConfig common.RunnerConfig, newData common.ExecutorData, err error) { // Find a new machine details, _ := data.(*machineDetails) canBeUsed := false if details != nil { details.Lock() canBeUsed = details.canBeUsed() details.Unlock() } if !canBeUsed || !m.machine.CanConnect(details.Name, true) { details, err = m.retryUseMachine(config) if err != nil { return } // Return details only if this is a new instance newData = details } // Get machine credentials dc, err := m.machine.Credentials(details.Name) if err != nil { if newData != nil { m.Release(config, newData) } newData = nil return } // Create shallow copy of config and store in it docker credentials newConfig = *config newConfig.Docker = &common.DockerConfig{} if config.Docker != nil { *newConfig.Docker = *config.Docker } newConfig.Docker.Credentials = dc // Mark machine as used details.Lock() details.State = machineStateUsed details.Used = time.Now() details.UsedCount++ details.Unlock() m.totalActions.WithLabelValues("used").Inc() return } func (m *machineProvider) Release(config *common.RunnerConfig, data common.ExecutorData) { // Release machine details, ok := data.(*machineDetails) if !ok { return } details.Lock() // Mark last used time when is Used if details.State == machineStateUsed { details.Used = time.Now() } usedCount := details.UsedCount details.Unlock() // Remove machine if we already used it if config != nil && config.Machine != nil && config.Machine.MaxBuilds > 0 && usedCount >= config.Machine.MaxBuilds { err := m.remove(details.Name, "Too many builds") if err == nil { return } } details.Lock() details.State = machineStateIdle details.Unlock() // Signal pending builds that a new machine is available. if err := m.signalRelease(config); err != nil { return } } func (m *machineProvider) signalRelease(config *common.RunnerConfig) error { coordinator, err := m.runnerMachinesCoordinator(config) if err != nil && err != errNoConfig { return err } if err != errNoConfig && coordinator != nil { coordinator.addAvailableMachine() } return nil } func (m *machineProvider) CanCreate() bool { return m.provider.CanCreate() } func (m *machineProvider) GetFeatures(features *common.FeaturesInfo) error { return m.provider.GetFeatures(features) } func (m *machineProvider) GetConfigInfo(input *common.RunnerConfig, output *common.ConfigInfo) { m.provider.GetConfigInfo(input, output) } func (m *machineProvider) GetDefaultShell() string { return m.provider.GetDefaultShell() } func (m *machineProvider) Create() common.Executor { return &machineExecutor{ provider: m, } } func newMachineProvider() *machineProvider { name := "docker+machine" provider := common.GetExecutorProvider("docker") if provider == nil { logrus.Panicln("docker executor provider not initialized") } return &machineProvider{ name: name, details: make(machinesDetails), runners: make(runnersDetails), machine: docker.NewMachineCommand(), provider: provider, totalActions: prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "gitlab_runner_autoscaling_actions_total", Help: "The total number of actions executed by the provider.", ConstLabels: prometheus.Labels{ "executor": name, }, }, []string{"action"}, ), currentStatesDesc: prometheus.NewDesc( "gitlab_runner_autoscaling_machine_states", "The current number of machines per state in this provider.", []string{"state"}, prometheus.Labels{ "executor": name, }, ), creationHistogram: prometheus.NewHistogram( prometheus.HistogramOpts{ Name: "gitlab_runner_autoscaling_machine_creation_duration_seconds", Help: "Histogram of machine creation time.", Buckets: prometheus.ExponentialBuckets(30, 1.25, 10), ConstLabels: prometheus.Labels{ "executor": name, }, }, ), stoppingHistogram: prometheus.NewHistogram( prometheus.HistogramOpts{ Name: "gitlab_runner_autoscaling_machine_stopping_duration_seconds", Help: "Histogram of machine stopping time.", Buckets: []float64{1, 3, 5, 10, 30, 50, 60, 80, 90, 120}, ConstLabels: prometheus.Labels{ "executor": name, }, }, ), removalHistogram: prometheus.NewHistogram( prometheus.HistogramOpts{ Name: "gitlab_runner_autoscaling_machine_removal_duration_seconds", Help: "Histogram of machine removal time.", Buckets: []float64{1, 3, 5, 10, 30, 50, 60, 80, 90, 120}, ConstLabels: prometheus.Labels{ "executor": name, }, }, ), failedCreationHistogram: prometheus.NewHistogram( prometheus.HistogramOpts{ Name: "gitlab_runner_autoscaling_machine_failed_creation_duration_seconds", Help: "Histogram of machine failed creation timings", Buckets: []float64{1, 3, 5, 10, 30, 50, 60, 80, 90, 120}, ConstLabels: prometheus.Labels{ "executor": name, }, }, ), } }