in cluster-autoscaler/main.go [98:267]
func buildAutoscaler(context ctx.Context, debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter) (core.Autoscaler, *loop.LoopTrigger, error) {
// Get AutoscalingOptions from flags.
autoscalingOptions := flags.AutoscalingOptions()
kubeClient := kube_util.CreateKubeClient(autoscalingOptions.KubeClientOpts)
// Informer transform to trim ManagedFields for memory efficiency.
trim := func(obj interface{}) (interface{}, error) {
if accessor, err := meta.Accessor(obj); err == nil {
accessor.SetManagedFields(nil)
}
return obj, nil
}
informerFactory := informers.NewSharedInformerFactoryWithOptions(kubeClient, 0, informers.WithTransform(trim))
fwHandle, err := framework.NewHandle(informerFactory, autoscalingOptions.SchedulerConfig, autoscalingOptions.DynamicResourceAllocationEnabled)
if err != nil {
return nil, nil, err
}
deleteOptions := options.NewNodeDeleteOptions(autoscalingOptions)
drainabilityRules := rules.Default(deleteOptions)
var snapshotStore clustersnapshot.ClusterSnapshotStore = store.NewDeltaSnapshotStore(autoscalingOptions.ClusterSnapshotParallelism)
if autoscalingOptions.DynamicResourceAllocationEnabled {
// TODO(DRA): Remove this once DeltaSnapshotStore is integrated with DRA.
klog.Warningf("Using BasicSnapshotStore instead of DeltaSnapshotStore because DRA is enabled. Autoscaling performance/scalability might be decreased.")
snapshotStore = store.NewBasicSnapshotStore()
}
opts := core.AutoscalerOptions{
AutoscalingOptions: autoscalingOptions,
FrameworkHandle: fwHandle,
ClusterSnapshot: predicate.NewPredicateSnapshot(snapshotStore, fwHandle, autoscalingOptions.DynamicResourceAllocationEnabled),
KubeClient: kubeClient,
InformerFactory: informerFactory,
DebuggingSnapshotter: debuggingSnapshotter,
DeleteOptions: deleteOptions,
DrainabilityRules: drainabilityRules,
ScaleUpOrchestrator: orchestrator.New(),
}
opts.Processors = ca_processors.DefaultProcessors(autoscalingOptions)
opts.Processors.TemplateNodeInfoProvider = nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(&autoscalingOptions.NodeInfoCacheExpireTime, autoscalingOptions.ForceDaemonSets)
podListProcessor := podlistprocessor.NewDefaultPodListProcessor(scheduling.ScheduleAnywhere)
var ProvisioningRequestInjector *provreq.ProvisioningRequestPodsInjector
if autoscalingOptions.ProvisioningRequestEnabled {
podListProcessor.AddProcessor(provreq.NewProvisioningRequestPodsFilter(provreq.NewDefautlEventManager()))
restConfig := kube_util.GetKubeConfig(autoscalingOptions.KubeClientOpts)
client, err := provreqclient.NewProvisioningRequestClient(restConfig)
if err != nil {
return nil, nil, err
}
ProvisioningRequestInjector, err = provreq.NewProvisioningRequestPodsInjector(restConfig, opts.ProvisioningRequestInitialBackoffTime, opts.ProvisioningRequestMaxBackoffTime, opts.ProvisioningRequestMaxBackoffCacheSize, opts.CheckCapacityBatchProcessing, opts.CheckCapacityProcessorInstance)
if err != nil {
return nil, nil, err
}
podListProcessor.AddProcessor(ProvisioningRequestInjector)
var provisioningRequestPodsInjector *provreq.ProvisioningRequestPodsInjector
if autoscalingOptions.CheckCapacityBatchProcessing {
klog.Infof("Batch processing for check capacity requests is enabled. Passing provisioning request injector to check capacity processor.")
provisioningRequestPodsInjector = ProvisioningRequestInjector
}
provreqOrchestrator := provreqorchestrator.New(client, []provreqorchestrator.ProvisioningClass{
checkcapacity.New(client, provisioningRequestPodsInjector),
besteffortatomic.New(client),
})
scaleUpOrchestrator := provreqorchestrator.NewWrapperOrchestrator(provreqOrchestrator)
opts.ScaleUpOrchestrator = scaleUpOrchestrator
provreqProcesor := provreq.NewProvReqProcessor(client, opts.CheckCapacityProcessorInstance)
opts.LoopStartNotifier = loopstart.NewObserversList([]loopstart.Observer{provreqProcesor})
podListProcessor.AddProcessor(provreqProcesor)
opts.Processors.ScaleUpEnforcer = provreq.NewProvisioningRequestScaleUpEnforcer()
}
if autoscalingOptions.ProactiveScaleupEnabled {
podInjectionBackoffRegistry := podinjectionbackoff.NewFakePodControllerRegistry()
podInjectionPodListProcessor := podinjection.NewPodInjectionPodListProcessor(podInjectionBackoffRegistry)
enforceInjectedPodsLimitProcessor := podinjection.NewEnforceInjectedPodsLimitProcessor(autoscalingOptions.PodInjectionLimit)
podListProcessor = pods.NewCombinedPodListProcessor([]pods.PodListProcessor{podInjectionPodListProcessor, podListProcessor, enforceInjectedPodsLimitProcessor})
// FakePodsScaleUpStatusProcessor processor needs to be the first processor in ScaleUpStatusProcessor before the default processor
// As it filters out fake pods from Scale Up status so that we don't emit events.
opts.Processors.ScaleUpStatusProcessor = status.NewCombinedScaleUpStatusProcessor([]status.ScaleUpStatusProcessor{podinjection.NewFakePodsScaleUpStatusProcessor(podInjectionBackoffRegistry), opts.Processors.ScaleUpStatusProcessor})
}
opts.Processors.PodListProcessor = podListProcessor
sdCandidatesSorting := previouscandidates.NewPreviousCandidates()
scaleDownCandidatesComparers := []scaledowncandidates.CandidatesComparer{
emptycandidates.NewEmptySortingProcessor(emptycandidates.NewNodeInfoGetter(opts.ClusterSnapshot), deleteOptions, drainabilityRules),
sdCandidatesSorting,
}
opts.Processors.ScaleDownCandidatesNotifier.Register(sdCandidatesSorting)
cp := scaledowncandidates.NewCombinedScaleDownCandidatesProcessor()
cp.Register(scaledowncandidates.NewScaleDownCandidatesSortingProcessor(scaleDownCandidatesComparers))
if autoscalingOptions.ScaleDownDelayTypeLocal {
sdp := scaledowncandidates.NewScaleDownCandidatesDelayProcessor()
cp.Register(sdp)
opts.Processors.ScaleStateNotifier.Register(sdp)
}
opts.Processors.ScaleDownNodeProcessor = cp
var nodeInfoComparator nodegroupset.NodeInfoComparator
if len(autoscalingOptions.BalancingLabels) > 0 {
nodeInfoComparator = nodegroupset.CreateLabelNodeInfoComparator(autoscalingOptions.BalancingLabels)
} else {
nodeInfoComparatorBuilder := nodegroupset.CreateGenericNodeInfoComparator
if autoscalingOptions.CloudProviderName == cloudprovider.AzureProviderName {
nodeInfoComparatorBuilder = nodegroupset.CreateAzureNodeInfoComparator
} else if autoscalingOptions.CloudProviderName == cloudprovider.AwsProviderName {
nodeInfoComparatorBuilder = nodegroupset.CreateAwsNodeInfoComparator
opts.Processors.TemplateNodeInfoProvider = nodeinfosprovider.NewAsgTagResourceNodeInfoProvider(&autoscalingOptions.NodeInfoCacheExpireTime, autoscalingOptions.ForceDaemonSets)
} else if autoscalingOptions.CloudProviderName == cloudprovider.GceProviderName {
nodeInfoComparatorBuilder = nodegroupset.CreateGceNodeInfoComparator
opts.Processors.TemplateNodeInfoProvider = nodeinfosprovider.NewAnnotationNodeInfoProvider(&autoscalingOptions.NodeInfoCacheExpireTime, autoscalingOptions.ForceDaemonSets)
}
nodeInfoComparator = nodeInfoComparatorBuilder(autoscalingOptions.BalancingExtraIgnoredLabels, autoscalingOptions.NodeGroupSetRatios)
}
opts.Processors.NodeGroupSetProcessor = &nodegroupset.BalancingNodeGroupSetProcessor{
Comparator: nodeInfoComparator,
}
// These metrics should be published only once.
metrics.UpdateCPULimitsCores(autoscalingOptions.MinCoresTotal, autoscalingOptions.MaxCoresTotal)
metrics.UpdateMemoryLimitsBytes(autoscalingOptions.MinMemoryTotal, autoscalingOptions.MaxMemoryTotal)
// Initialize metrics.
metrics.InitMetrics()
// Create autoscaler.
autoscaler, err := core.NewAutoscaler(opts, informerFactory)
if err != nil {
return nil, nil, err
}
// Start informers. This must come after fully constructing the autoscaler because
// additional informers might have been registered in the factory during NewAutoscaler.
stop := make(chan struct{})
informerFactory.Start(stop)
klog.Info("Initializing resource informers, blocking until caches are synced")
informersSynced := informerFactory.WaitForCacheSync(stop)
for _, synced := range informersSynced {
if !synced {
return nil, nil, fmt.Errorf("unable to start and sync resource informers")
}
}
podObserver := loop.StartPodObserver(context, kube_util.CreateKubeClient(autoscalingOptions.KubeClientOpts))
// A ProvisioningRequestPodsInjector is used as provisioningRequestProcessingTimesGetter here to obtain the last time a
// ProvisioningRequest was processed. This is because the ProvisioningRequestPodsInjector in addition to injecting pods
// also marks the ProvisioningRequest as accepted or failed.
trigger := loop.NewLoopTrigger(autoscaler, ProvisioningRequestInjector, podObserver, autoscalingOptions.ScanInterval)
return autoscaler, trigger, nil
}