executors/internal/autoscaler/provider.go (375 lines of code) (raw):
package autoscaler
import (
"context"
"errors"
"fmt"
"net"
"os"
"path/filepath"
"sync"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"gitlab.com/gitlab-org/fleeting/fleeting"
"gitlab.com/gitlab-org/fleeting/fleeting-artifact/pkg/installer"
"gitlab.com/gitlab-org/fleeting/fleeting/connector"
flprometheus "gitlab.com/gitlab-org/fleeting/fleeting/metrics/prometheus"
fleetingprovider "gitlab.com/gitlab-org/fleeting/fleeting/provider"
"gitlab.com/gitlab-org/fleeting/nesting/api"
"gitlab.com/gitlab-org/fleeting/nesting/hypervisor"
"gitlab.com/gitlab-org/fleeting/taskscaler"
tsprometheus "gitlab.com/gitlab-org/fleeting/taskscaler/metrics/prometheus"
"gitlab.com/gitlab-org/fleeting/taskscaler/storage"
"gitlab.com/gitlab-org/gitlab-runner/common"
"gitlab.com/gitlab-org/gitlab-runner/executors/internal/autoscaler/logger"
"gitlab.com/gitlab-org/gitlab-runner/helpers"
"gitlab.com/gitlab-org/gitlab-runner/helpers/featureflags"
)
var (
_ prometheus.Collector = &provider{}
_ common.ManagedExecutorProvider = &provider{}
)
type fleetingPlugin interface {
InstanceGroup() fleetingprovider.InstanceGroup
Kill()
}
type provider struct {
common.ExecutorProvider
cfg Config
mu sync.Mutex
scalers map[string]scaler
// Testing hooks
taskscalerNew func(context.Context, fleetingprovider.InstanceGroup, ...taskscaler.Option) (taskscaler.Taskscaler, error)
fleetingRunPlugin func(string, []byte, ...fleeting.PluginOption) (fleetingPlugin, error)
generateUniqueID func() (string, error)
}
type scaler struct {
internal taskscaler.Taskscaler
shutdown func(context.Context)
configLoadedAt time.Time
}
type Config struct {
// mapJobImageToVMImage allows the job defined image to control the VM
// image used.
//
// Examples:
// - For "instance" executor and VM Isolation enabled: the job image defines
// what nested VM is used on the host. We want to map the job image to
// the VM image.
// - For "docker" executor and VM Isolation enabled: the job image defines what
// container is used, inside the nested VM, on the host. We *don't* want
// to map the job image to the VM image.
MapJobImageToVMImage bool
}
func New(ep common.ExecutorProvider, cfg Config) common.ExecutorProvider {
return &provider{
ExecutorProvider: ep,
cfg: cfg,
scalers: make(map[string]scaler),
taskscalerNew: taskscaler.New,
fleetingRunPlugin: func(name string, config []byte, opts ...fleeting.PluginOption) (fleetingPlugin, error) {
pluginPath, err := installer.LookPath(name, "")
if err != nil {
return nil, fmt.Errorf("loading fleeting plugin: %w", err)
}
return fleeting.RunPlugin(pluginPath, config, opts...)
},
generateUniqueID: func() (string, error) {
return helpers.GenerateRandomUUID(8)
},
}
}
func (p *provider) Init() {}
func (p *provider) Shutdown(ctx context.Context) {
p.mu.Lock()
defer p.mu.Unlock()
wg := new(sync.WaitGroup)
for key, s := range p.scalers {
wg.Add(1)
go func(sc scaler) {
defer wg.Done()
sc.shutdown(ctx)
}(s)
delete(p.scalers, key)
}
wg.Wait()
}
//nolint:gocognit
func (p *provider) init(config *common.RunnerConfig) (taskscaler.Taskscaler, bool, error) {
if config.Autoscaler == nil {
return nil, false, fmt.Errorf("executor requires autoscaler config")
}
p.mu.Lock()
defer p.mu.Unlock()
s, ok := p.scalers[config.GetToken()]
if ok {
// detect if the config has been reloaded
refresh := s.configLoadedAt != config.ConfigLoadedAt
s.configLoadedAt = config.ConfigLoadedAt
return s.internal, refresh, nil
}
pluginCfg, err := config.Autoscaler.PluginConfig.JSON()
if err != nil {
return nil, false, fmt.Errorf("marshaling plugin config: %w", err)
}
logger := logger.New(config.RunnerCredentials.Log())
var store storage.Storage
if config.Autoscaler.StateStorage.Enabled {
dir := config.Autoscaler.StateStorage.Dir
if dir == "" {
dir = filepath.Join(config.ConfigDir, ".taskscaler")
}
store, err = storage.NewFileStorage(filepath.Join(dir, helpers.ShortenToken(config.Token)))
if err != nil {
return nil, false, fmt.Errorf("creating state storage: %w", err)
}
}
runner, err := p.fleetingRunPlugin(config.Autoscaler.Plugin, pluginCfg, fleeting.WithPluginLogger(logger.Named("fleeting-plugin")))
if err != nil {
return nil, false, fmt.Errorf("running autoscaler plugin: %w", err)
}
instanceConnectConfig := fleetingprovider.ConnectorConfig{
OS: config.Autoscaler.ConnectorConfig.OS,
Arch: config.Autoscaler.ConnectorConfig.Arch,
Protocol: fleetingprovider.Protocol(config.Autoscaler.ConnectorConfig.Protocol),
ProtocolPort: config.Autoscaler.ConnectorConfig.ProtocolPort,
Username: config.Autoscaler.ConnectorConfig.Username,
Password: config.Autoscaler.ConnectorConfig.Password,
UseStaticCredentials: config.Autoscaler.ConnectorConfig.UseStaticCredentials,
Keepalive: config.Autoscaler.ConnectorConfig.Keepalive,
Timeout: config.Autoscaler.ConnectorConfig.Timeout,
}
if config.Autoscaler.ConnectorConfig.KeyPathname != "" {
key, err := os.ReadFile(config.Autoscaler.ConnectorConfig.KeyPathname)
if err != nil {
runner.Kill()
return nil, false, fmt.Errorf("reading instance group connector key: %w", err)
}
instanceConnectConfig.Key = key
}
constLabels := prometheus.Labels{
"runner": config.ShortDescription(),
"runner_name": config.Name,
"system_id": config.GetSystemID(),
}
tsMC := tsprometheus.New(
tsprometheus.WithConstLabels(constLabels),
tsprometheus.WithInstanceReadinessTimeBuckets(config.Autoscaler.GetInstanceReadinessTimeBuckets()),
)
flMC := flprometheus.New(
flprometheus.WithConstLabels(constLabels),
flprometheus.WithInstanceCreationTimeBuckets(config.Autoscaler.GetInstanceCreationTimeBuckets()),
flprometheus.WithInstanceIsRunningTimeBuckets(config.Autoscaler.GetInstanceIsRunningTimeBuckets()),
flprometheus.WithInstanceDeletionTimeBuckets(config.Autoscaler.GetInstanceDeletionTimeBuckets()),
flprometheus.WithInstanceLifeDurationBuckets(config.Autoscaler.InstanceLifeDurationBuckets),
)
shutdownCtx, shutdownFn := context.WithCancel(context.Background())
options := []taskscaler.Option{
taskscaler.WithReservations(),
taskscaler.WithCapacityPerInstance(config.Autoscaler.CapacityPerInstance),
taskscaler.WithMaxUseCount(config.Autoscaler.MaxUseCount),
taskscaler.WithMaxInstances(config.Autoscaler.MaxInstances),
taskscaler.WithInstanceGroupSettings(fleetingprovider.Settings{
ConnectorConfig: instanceConnectConfig,
}),
taskscaler.WithMetricsCollector(tsMC),
taskscaler.WithFleetingMetricsCollector(flMC),
taskscaler.WithInstanceUpFunc(instanceReadyUp(shutdownCtx, config)),
taskscaler.WithUpdateInterval(time.Minute),
taskscaler.WithUpdateIntervalWhenExpecting(time.Second),
taskscaler.WithLogger(logger.Named("taskscaler")),
taskscaler.WithScaleThrottle(config.Autoscaler.ScaleThrottle.Limit, config.Autoscaler.ScaleThrottle.Burst),
}
if config.IsFeatureFlagOn(featureflags.UseFleetingAcquireHeartbeats) {
options = append(options, taskscaler.WithHeartbeatFunc(instanceHeartbeat(config)))
}
if store != nil {
options = append(options, taskscaler.WithStorage(store))
}
if config.Autoscaler.UpdateInterval > 0 {
options = append(options, taskscaler.WithUpdateInterval(config.Autoscaler.UpdateInterval))
}
if config.Autoscaler.UpdateIntervalWhenExpecting > 0 {
options = append(options, taskscaler.WithUpdateIntervalWhenExpecting(config.Autoscaler.UpdateIntervalWhenExpecting))
}
if config.Autoscaler.DeleteInstancesOnShutdown {
options = append(options, taskscaler.WithDeleteInstancesOnShutdown())
}
ctx, cancelFn := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancelFn()
ts, err := p.taskscalerNew(ctx, runner.InstanceGroup(), options...)
if err != nil {
shutdownFn()
runner.Kill()
return nil, false, fmt.Errorf("creating taskscaler: %w", err)
}
s = scaler{
internal: ts,
shutdown: func(ctx context.Context) {
shutdownFn()
ts.Shutdown(ctx)
runner.Kill()
},
configLoadedAt: config.ConfigLoadedAt,
}
p.scalers[config.GetToken()] = s
return s.internal, true, nil
}
//nolint:gocognit
func (p *provider) Acquire(config *common.RunnerConfig) (common.ExecutorData, error) {
scaler, refresh, err := p.init(config)
if err != nil {
return nil, fmt.Errorf("initializing taskscaler: %w", err)
}
// reconfigure policy if the config has been reloaded
if refresh {
var schedules []taskscaler.Schedule
for _, schedule := range config.Autoscaler.Policy {
schedules = append(schedules, taskscaler.Schedule{
Periods: schedule.Periods,
Timezone: schedule.Timezone,
IdleCount: schedule.IdleCount,
IdleTime: schedule.IdleTime,
ScaleFactor: schedule.ScaleFactor,
ScaleFactorLimit: schedule.ScaleFactorLimit,
PreemptiveMode: schedule.IdleCount > 0,
})
}
if err := scaler.ConfigureSchedule(schedules...); err != nil {
return nil, fmt.Errorf("configuring taskscaler schedules: %w", err)
}
}
// generate key for acquisition
key, err := p.generateUniqueID()
if err != nil {
return nil, fmt.Errorf("generating unique id for task acquisition: %w", err)
}
key = helpers.ShortenToken(config.Token) + key
if err := scaler.Reserve(key); err != nil {
if errors.Is(err, taskscaler.ErrNoCapacity) {
err = &common.NoFreeExecutorError{Message: fmt.Sprintf("reserving taskscaler capacity: %v", err)}
}
return nil, err
}
logrus.WithField("key", key).Trace("Reserved capacity...")
return newAcquisitionRef(key, p.cfg.MapJobImageToVMImage), nil
}
func (p *provider) Release(config *common.RunnerConfig, data common.ExecutorData) {
acqRef, ok := data.(*acquisitionRef)
if !ok {
return
}
if acqRef.acq != nil {
p.getRunnerTaskscaler(config).Release(acqRef.key)
logrus.WithField("key", acqRef.key).Trace("Released capacity...")
acqRef.acq = nil
return
}
p.getRunnerTaskscaler(config).Unreserve(acqRef.key)
logrus.WithField("key", acqRef.key).Trace("Unreserved capacity...")
}
func (p *provider) Create() common.Executor {
e := p.ExecutorProvider.Create()
if e == nil {
return nil
}
return &executor{
provider: p,
Executor: e,
}
}
func (p *provider) getRunnerTaskscaler(config *common.RunnerConfig) taskscaler.Taskscaler {
p.mu.Lock()
defer p.mu.Unlock()
return p.scalers[config.GetToken()].internal
}
func (p *provider) Describe(ch chan<- *prometheus.Desc) {
for _, scaler := range p.scalers {
c, ok := scaler.internal.MetricsCollector().(prometheus.Collector)
if ok {
c.Describe(ch)
}
c, ok = scaler.internal.FleetingMetricsCollector().(prometheus.Collector)
if ok {
c.Describe(ch)
}
}
}
func (p *provider) Collect(ch chan<- prometheus.Metric) {
for _, scaler := range p.scalers {
c, ok := scaler.internal.MetricsCollector().(prometheus.Collector)
if ok {
c.Collect(ch)
}
c, ok = scaler.internal.FleetingMetricsCollector().(prometheus.Collector)
if ok {
c.Collect(ch)
}
}
}
//nolint:gocognit
func instanceReadyUp(ctx context.Context, config *common.RunnerConfig) taskscaler.UpFunc {
return func(ts taskscaler.Taskscaler, instance taskscaler.UpFuncInstance) error {
if len(instance.Acquisitions) > 0 {
// We currently have no way to resume acquisitions, so for now we remove them
for _, key := range instance.Acquisitions {
ts.Release(key)
}
if !config.Autoscaler.StateStorage.KeepInstanceWithAcquisitions {
return fmt.Errorf("pre-existing instance has acquisition so removing for safety")
}
}
// If the instance pre-existed, and VMIsolation and the instance wasn't
// restored from saved state, then we cannot trust the instance.
if instance.Cause == fleeting.CausePreexisted &&
!instance.Restored && !config.Autoscaler.VMIsolation.Enabled {
return fmt.Errorf("no data on pre-existing instance so removing for safety")
}
useExternalAddr := true
if config.Autoscaler != nil {
useExternalAddr = config.Autoscaler.ConnectorConfig.UseExternalAddr
}
// run instance ready command on instance
if config.Autoscaler.InstanceReadyCommand != "" {
err := connector.Run(ctx, instance.Info, connector.ConnectorOptions{
RunOptions: connector.RunOptions{
Command: config.Autoscaler.InstanceReadyCommand,
},
DialOptions: connector.DialOptions{
UseExternalAddr: useExternalAddr,
},
})
if err != nil {
return fmt.Errorf("ready command: %w", err)
}
}
if !config.Autoscaler.VMIsolation.Enabled {
return nil
}
return readyNestingHost(ctx, config, instance, useExternalAddr)
}
}
func readyNestingHost(ctx context.Context, config *common.RunnerConfig, instance taskscaler.UpFuncInstance, useExternalAddr bool) error {
// dial host
dialer, err := connector.Dial(ctx, instance.Info, connector.DialOptions{
UseExternalAddr: useExternalAddr,
})
if err != nil {
return fmt.Errorf("dialing host: %w", err)
}
defer dialer.Close()
conn, err := api.NewClientConn(config.Autoscaler.VMIsolation.NestingHost, func(ctx context.Context, network, address string) (net.Conn, error) {
return dialer.Dial(network, address)
})
if err != nil {
return fmt.Errorf("dialing nesting daemon: %w", err)
}
nc := api.New(conn)
defer nc.Close()
var vms []hypervisor.VirtualMachine
err = withInit(ctx, config, nc, func() error {
vms, err = nc.List(ctx)
return err
})
if err != nil {
return fmt.Errorf("listing existing vms: %w", err)
}
// we can't yet reattach to existing VMs, so we attempt to delete for now
// if we can't delete for some reason, these VMs can be stomped by new
// jobs anyway.
for _, vm := range vms {
_ = nc.Delete(ctx, vm.GetId())
}
return nil
}
func instanceHeartbeat(config *common.RunnerConfig) taskscaler.HeartbeatFunc {
useExternalAddr := true
if config.Autoscaler != nil {
useExternalAddr = config.Autoscaler.ConnectorConfig.UseExternalAddr
}
return func(ctx context.Context, info fleetingprovider.ConnectInfo) error {
return connector.Run(ctx, info, connector.ConnectorOptions{
RunOptions: connector.RunOptions{
Command: "exit 0",
},
DialOptions: connector.DialOptions{
UseExternalAddr: useExternalAddr,
},
})
}
}