in executors/internal/autoscaler/provider.go [117:261]
func (p *provider) init(config *common.RunnerConfig) (taskscaler.Taskscaler, bool, error) {
if config.Autoscaler == nil {
return nil, false, fmt.Errorf("executor requires autoscaler config")
}
p.mu.Lock()
defer p.mu.Unlock()
s, ok := p.scalers[config.GetToken()]
if ok {
// detect if the config has been reloaded
refresh := s.configLoadedAt != config.ConfigLoadedAt
s.configLoadedAt = config.ConfigLoadedAt
return s.internal, refresh, nil
}
pluginCfg, err := config.Autoscaler.PluginConfig.JSON()
if err != nil {
return nil, false, fmt.Errorf("marshaling plugin config: %w", err)
}
logger := logger.New(config.RunnerCredentials.Log())
var store storage.Storage
if config.Autoscaler.StateStorage.Enabled {
dir := config.Autoscaler.StateStorage.Dir
if dir == "" {
dir = filepath.Join(config.ConfigDir, ".taskscaler")
}
store, err = storage.NewFileStorage(filepath.Join(dir, helpers.ShortenToken(config.Token)))
if err != nil {
return nil, false, fmt.Errorf("creating state storage: %w", err)
}
}
runner, err := p.fleetingRunPlugin(config.Autoscaler.Plugin, pluginCfg, fleeting.WithPluginLogger(logger.Named("fleeting-plugin")))
if err != nil {
return nil, false, fmt.Errorf("running autoscaler plugin: %w", err)
}
instanceConnectConfig := fleetingprovider.ConnectorConfig{
OS: config.Autoscaler.ConnectorConfig.OS,
Arch: config.Autoscaler.ConnectorConfig.Arch,
Protocol: fleetingprovider.Protocol(config.Autoscaler.ConnectorConfig.Protocol),
ProtocolPort: config.Autoscaler.ConnectorConfig.ProtocolPort,
Username: config.Autoscaler.ConnectorConfig.Username,
Password: config.Autoscaler.ConnectorConfig.Password,
UseStaticCredentials: config.Autoscaler.ConnectorConfig.UseStaticCredentials,
Keepalive: config.Autoscaler.ConnectorConfig.Keepalive,
Timeout: config.Autoscaler.ConnectorConfig.Timeout,
}
if config.Autoscaler.ConnectorConfig.KeyPathname != "" {
key, err := os.ReadFile(config.Autoscaler.ConnectorConfig.KeyPathname)
if err != nil {
runner.Kill()
return nil, false, fmt.Errorf("reading instance group connector key: %w", err)
}
instanceConnectConfig.Key = key
}
constLabels := prometheus.Labels{
"runner": config.ShortDescription(),
"runner_name": config.Name,
"system_id": config.GetSystemID(),
}
tsMC := tsprometheus.New(
tsprometheus.WithConstLabels(constLabels),
tsprometheus.WithInstanceReadinessTimeBuckets(config.Autoscaler.GetInstanceReadinessTimeBuckets()),
)
flMC := flprometheus.New(
flprometheus.WithConstLabels(constLabels),
flprometheus.WithInstanceCreationTimeBuckets(config.Autoscaler.GetInstanceCreationTimeBuckets()),
flprometheus.WithInstanceIsRunningTimeBuckets(config.Autoscaler.GetInstanceIsRunningTimeBuckets()),
flprometheus.WithInstanceDeletionTimeBuckets(config.Autoscaler.GetInstanceDeletionTimeBuckets()),
flprometheus.WithInstanceLifeDurationBuckets(config.Autoscaler.InstanceLifeDurationBuckets),
)
shutdownCtx, shutdownFn := context.WithCancel(context.Background())
options := []taskscaler.Option{
taskscaler.WithReservations(),
taskscaler.WithCapacityPerInstance(config.Autoscaler.CapacityPerInstance),
taskscaler.WithMaxUseCount(config.Autoscaler.MaxUseCount),
taskscaler.WithMaxInstances(config.Autoscaler.MaxInstances),
taskscaler.WithInstanceGroupSettings(fleetingprovider.Settings{
ConnectorConfig: instanceConnectConfig,
}),
taskscaler.WithMetricsCollector(tsMC),
taskscaler.WithFleetingMetricsCollector(flMC),
taskscaler.WithInstanceUpFunc(instanceReadyUp(shutdownCtx, config)),
taskscaler.WithUpdateInterval(time.Minute),
taskscaler.WithUpdateIntervalWhenExpecting(time.Second),
taskscaler.WithLogger(logger.Named("taskscaler")),
taskscaler.WithScaleThrottle(config.Autoscaler.ScaleThrottle.Limit, config.Autoscaler.ScaleThrottle.Burst),
}
if config.IsFeatureFlagOn(featureflags.UseFleetingAcquireHeartbeats) {
options = append(options, taskscaler.WithHeartbeatFunc(instanceHeartbeat(config)))
}
if store != nil {
options = append(options, taskscaler.WithStorage(store))
}
if config.Autoscaler.UpdateInterval > 0 {
options = append(options, taskscaler.WithUpdateInterval(config.Autoscaler.UpdateInterval))
}
if config.Autoscaler.UpdateIntervalWhenExpecting > 0 {
options = append(options, taskscaler.WithUpdateIntervalWhenExpecting(config.Autoscaler.UpdateIntervalWhenExpecting))
}
if config.Autoscaler.DeleteInstancesOnShutdown {
options = append(options, taskscaler.WithDeleteInstancesOnShutdown())
}
ctx, cancelFn := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancelFn()
ts, err := p.taskscalerNew(ctx, runner.InstanceGroup(), options...)
if err != nil {
shutdownFn()
runner.Kill()
return nil, false, fmt.Errorf("creating taskscaler: %w", err)
}
s = scaler{
internal: ts,
shutdown: func(ctx context.Context) {
shutdownFn()
ts.Shutdown(ctx)
runner.Kill()
},
configLoadedAt: config.ConfigLoadedAt,
}
p.scalers[config.GetToken()] = s
return s.internal, true, nil
}