executors/docker/machine/coordinator.go (55 lines of code) (raw):
package machine
import (
"sync"
)
// runnerMachinesCoordinator tracks the status of a specific Machine configuration, ensuring that the maximum number
// of concurrent machines being provisioned are limited.
type runnerMachinesCoordinator struct {
growing int
growthCondLock sync.Mutex
growthCond *sync.Cond
available uint
availableLock sync.Mutex
availableSignal chan struct{}
}
func newRunnerMachinesCoordinator() *runnerMachinesCoordinator {
coordinator := runnerMachinesCoordinator{}
coordinator.availableSignal = make(chan struct{})
coordinator.growthCond = sync.NewCond(&coordinator.growthCondLock)
return &coordinator
}
func (r *runnerMachinesCoordinator) waitForGrowthCapacity(maxGrowth int, f func()) {
r.growthCondLock.Lock()
for maxGrowth != 0 && r.growing >= maxGrowth {
r.growthCond.Wait()
}
r.growing++
r.growthCondLock.Unlock()
defer func() {
r.growthCondLock.Lock()
r.growing--
r.growthCondLock.Unlock()
r.growthCond.Signal()
}()
f()
}
// getAvailableMachine returns whether there is a machine available.
// It reduces the internal counter if it can be reduced so next time it might return
// a different value.
func (r *runnerMachinesCoordinator) getAvailableMachine() bool {
r.availableLock.Lock()
defer r.availableLock.Unlock()
if r.available == 0 {
return false
}
r.available--
return true
}
// addAvailableMachine increments an internal counter which
// is used by getAvailableMachine to check for availability.
func (r *runnerMachinesCoordinator) addAvailableMachine() {
r.availableLock.Lock()
defer r.availableLock.Unlock()
r.available++
select {
case r.availableSignal <- struct{}{}:
default:
}
}
func (r *runnerMachinesCoordinator) availableMachineSignal() <-chan struct{} {
return r.availableSignal
}
type runnersDetails map[string]*runnerMachinesCoordinator