func()

in executors/internal/autoscaler/provider.go [117:261]


func (p *provider) init(config *common.RunnerConfig) (taskscaler.Taskscaler, bool, error) {
	if config.Autoscaler == nil {
		return nil, false, fmt.Errorf("executor requires autoscaler config")
	}

	p.mu.Lock()
	defer p.mu.Unlock()

	s, ok := p.scalers[config.GetToken()]
	if ok {
		// detect if the config has been reloaded
		refresh := s.configLoadedAt != config.ConfigLoadedAt
		s.configLoadedAt = config.ConfigLoadedAt
		return s.internal, refresh, nil
	}

	pluginCfg, err := config.Autoscaler.PluginConfig.JSON()
	if err != nil {
		return nil, false, fmt.Errorf("marshaling plugin config: %w", err)
	}

	logger := logger.New(config.RunnerCredentials.Log())

	var store storage.Storage
	if config.Autoscaler.StateStorage.Enabled {
		dir := config.Autoscaler.StateStorage.Dir
		if dir == "" {
			dir = filepath.Join(config.ConfigDir, ".taskscaler")
		}

		store, err = storage.NewFileStorage(filepath.Join(dir, helpers.ShortenToken(config.Token)))
		if err != nil {
			return nil, false, fmt.Errorf("creating state storage: %w", err)
		}
	}

	runner, err := p.fleetingRunPlugin(config.Autoscaler.Plugin, pluginCfg, fleeting.WithPluginLogger(logger.Named("fleeting-plugin")))
	if err != nil {
		return nil, false, fmt.Errorf("running autoscaler plugin: %w", err)
	}

	instanceConnectConfig := fleetingprovider.ConnectorConfig{
		OS:                   config.Autoscaler.ConnectorConfig.OS,
		Arch:                 config.Autoscaler.ConnectorConfig.Arch,
		Protocol:             fleetingprovider.Protocol(config.Autoscaler.ConnectorConfig.Protocol),
		ProtocolPort:         config.Autoscaler.ConnectorConfig.ProtocolPort,
		Username:             config.Autoscaler.ConnectorConfig.Username,
		Password:             config.Autoscaler.ConnectorConfig.Password,
		UseStaticCredentials: config.Autoscaler.ConnectorConfig.UseStaticCredentials,
		Keepalive:            config.Autoscaler.ConnectorConfig.Keepalive,
		Timeout:              config.Autoscaler.ConnectorConfig.Timeout,
	}

	if config.Autoscaler.ConnectorConfig.KeyPathname != "" {
		key, err := os.ReadFile(config.Autoscaler.ConnectorConfig.KeyPathname)
		if err != nil {
			runner.Kill()

			return nil, false, fmt.Errorf("reading instance group connector key: %w", err)
		}
		instanceConnectConfig.Key = key
	}

	constLabels := prometheus.Labels{
		"runner":      config.ShortDescription(),
		"runner_name": config.Name,
		"system_id":   config.GetSystemID(),
	}

	tsMC := tsprometheus.New(
		tsprometheus.WithConstLabels(constLabels),
		tsprometheus.WithInstanceReadinessTimeBuckets(config.Autoscaler.GetInstanceReadinessTimeBuckets()),
	)
	flMC := flprometheus.New(
		flprometheus.WithConstLabels(constLabels),
		flprometheus.WithInstanceCreationTimeBuckets(config.Autoscaler.GetInstanceCreationTimeBuckets()),
		flprometheus.WithInstanceIsRunningTimeBuckets(config.Autoscaler.GetInstanceIsRunningTimeBuckets()),
		flprometheus.WithInstanceDeletionTimeBuckets(config.Autoscaler.GetInstanceDeletionTimeBuckets()),
		flprometheus.WithInstanceLifeDurationBuckets(config.Autoscaler.InstanceLifeDurationBuckets),
	)

	shutdownCtx, shutdownFn := context.WithCancel(context.Background())

	options := []taskscaler.Option{
		taskscaler.WithReservations(),
		taskscaler.WithCapacityPerInstance(config.Autoscaler.CapacityPerInstance),
		taskscaler.WithMaxUseCount(config.Autoscaler.MaxUseCount),
		taskscaler.WithMaxInstances(config.Autoscaler.MaxInstances),
		taskscaler.WithInstanceGroupSettings(fleetingprovider.Settings{
			ConnectorConfig: instanceConnectConfig,
		}),
		taskscaler.WithMetricsCollector(tsMC),
		taskscaler.WithFleetingMetricsCollector(flMC),
		taskscaler.WithInstanceUpFunc(instanceReadyUp(shutdownCtx, config)),
		taskscaler.WithUpdateInterval(time.Minute),
		taskscaler.WithUpdateIntervalWhenExpecting(time.Second),
		taskscaler.WithLogger(logger.Named("taskscaler")),
		taskscaler.WithScaleThrottle(config.Autoscaler.ScaleThrottle.Limit, config.Autoscaler.ScaleThrottle.Burst),
	}

	if config.IsFeatureFlagOn(featureflags.UseFleetingAcquireHeartbeats) {
		options = append(options, taskscaler.WithHeartbeatFunc(instanceHeartbeat(config)))
	}

	if store != nil {
		options = append(options, taskscaler.WithStorage(store))
	}

	if config.Autoscaler.UpdateInterval > 0 {
		options = append(options, taskscaler.WithUpdateInterval(config.Autoscaler.UpdateInterval))
	}

	if config.Autoscaler.UpdateIntervalWhenExpecting > 0 {
		options = append(options, taskscaler.WithUpdateIntervalWhenExpecting(config.Autoscaler.UpdateIntervalWhenExpecting))
	}

	if config.Autoscaler.DeleteInstancesOnShutdown {
		options = append(options, taskscaler.WithDeleteInstancesOnShutdown())
	}

	ctx, cancelFn := context.WithTimeout(context.Background(), 1*time.Minute)
	defer cancelFn()

	ts, err := p.taskscalerNew(ctx, runner.InstanceGroup(), options...)
	if err != nil {
		shutdownFn()
		runner.Kill()

		return nil, false, fmt.Errorf("creating taskscaler: %w", err)
	}

	s = scaler{
		internal: ts,
		shutdown: func(ctx context.Context) {
			shutdownFn()
			ts.Shutdown(ctx)
			runner.Kill()
		},
		configLoadedAt: config.ConfigLoadedAt,
	}

	p.scalers[config.GetToken()] = s

	return s.internal, true, nil
}