in cmd/hubagent/workload/setup.go [117:413]
func SetupControllers(ctx context.Context, wg *sync.WaitGroup, mgr ctrl.Manager, config *rest.Config, opts *options.Options) error { //nolint:gocyclo
// TODO: Try to reduce the complexity of this last measured at 33 (failing at > 30) and remove the // nolint:gocyclo
dynamicClient, err := dynamic.NewForConfig(config)
if err != nil {
klog.ErrorS(err, "unable to create the dynamic client")
return err
}
discoverClient := discovery.NewDiscoveryClientForConfigOrDie(config)
// AllowedPropagatingAPIs and SkippedPropagatingAPIs are mutually exclusive.
// If none of them are set, the resourceConfig by default stores a list of skipped propagation APIs.
resourceConfig := utils.NewResourceConfig(opts.AllowedPropagatingAPIs != "")
if err = resourceConfig.Parse(opts.AllowedPropagatingAPIs); err != nil {
// The program will never go here because the parameters have been checked.
return err
}
if err = resourceConfig.Parse(opts.SkippedPropagatingAPIs); err != nil {
// The program will never go here because the parameters have been checked
return err
}
// setup namespaces we skip propagation
skippedNamespaces := make(map[string]bool)
skippedNamespaces["default"] = true
optionalSkipNS := strings.Split(opts.SkippedPropagatingNamespaces, ";")
for _, ns := range optionalSkipNS {
if len(ns) > 0 {
klog.InfoS("user specified a namespace to skip", "namespace", ns)
skippedNamespaces[ns] = true
}
}
// the manager for all the dynamically created informers
dynamicInformerManager := informer.NewInformerManager(dynamicClient, opts.ResyncPeriod.Duration, ctx.Done())
validator.ResourceInformer = dynamicInformerManager // webhook needs this to check resource scope
validator.RestMapper = mgr.GetRESTMapper() // webhook needs this to validate GVK of resource selector
// Set up a custom controller to reconcile cluster resource placement
crpc := &clusterresourceplacement.Reconciler{
Client: mgr.GetClient(),
Recorder: mgr.GetEventRecorderFor(crpControllerName),
RestMapper: mgr.GetRESTMapper(),
InformerManager: dynamicInformerManager,
ResourceConfig: resourceConfig,
SkippedNamespaces: skippedNamespaces,
Scheme: mgr.GetScheme(),
UncachedReader: mgr.GetAPIReader(),
}
rateLimiter := options.DefaultControllerRateLimiter(opts.RateLimiterOpts)
var clusterResourcePlacementControllerV1Alpha1 controller.Controller
var clusterResourcePlacementControllerV1Beta1 controller.Controller
var memberClusterPlacementController controller.Controller
if opts.EnableV1Alpha1APIs {
for _, gvk := range v1Alpha1RequiredGVKs {
if err = utils.CheckCRDInstalled(discoverClient, gvk); err != nil {
klog.ErrorS(err, "unable to find the required CRD", "GVK", gvk)
return err
}
}
klog.Info("Setting up clusterResourcePlacement v1alpha1 controller")
clusterResourcePlacementControllerV1Alpha1 = controller.NewController(crpControllerV1Alpha1Name, controller.NamespaceKeyFunc, crpc.ReconcileV1Alpha1, rateLimiter)
klog.Info("Setting up member cluster change controller")
mcp := &memberclusterplacement.Reconciler{
InformerManager: dynamicInformerManager,
PlacementController: clusterResourcePlacementControllerV1Alpha1,
}
memberClusterPlacementController = controller.NewController(mcPlacementControllerName, controller.NamespaceKeyFunc, mcp.Reconcile, rateLimiter)
}
if opts.EnableV1Beta1APIs {
for _, gvk := range v1Beta1RequiredGVKs {
if err = utils.CheckCRDInstalled(discoverClient, gvk); err != nil {
klog.ErrorS(err, "unable to find the required CRD", "GVK", gvk)
return err
}
}
klog.Info("Setting up clusterResourcePlacement v1beta1 controller")
clusterResourcePlacementControllerV1Beta1 = controller.NewController(crpControllerV1Beta1Name, controller.NamespaceKeyFunc, crpc.Reconcile, rateLimiter)
klog.Info("Setting up clusterResourcePlacement watcher")
if err := (&clusterresourceplacementwatcher.Reconciler{
PlacementController: clusterResourcePlacementControllerV1Beta1,
}).SetupWithManager(mgr); err != nil {
klog.ErrorS(err, "Unable to set up the clusterResourcePlacement watcher")
return err
}
klog.Info("Setting up clusterResourceBinding watcher")
if err := (&clusterresourcebindingwatcher.Reconciler{
PlacementController: clusterResourcePlacementControllerV1Beta1,
Client: mgr.GetClient(),
}).SetupWithManager(mgr); err != nil {
klog.ErrorS(err, "Unable to set up the clusterResourceBinding watcher")
return err
}
klog.Info("Setting up clusterSchedulingPolicySnapshot watcher")
if err := (&clusterschedulingpolicysnapshot.Reconciler{
Client: mgr.GetClient(),
PlacementController: clusterResourcePlacementControllerV1Beta1,
}).SetupWithManager(mgr); err != nil {
klog.ErrorS(err, "Unable to set up the clusterSchedulingPolicySnapshot watcher")
return err
}
// Set up a new controller to do rollout resources according to CRP rollout strategy
klog.Info("Setting up rollout controller")
if err := (&rollout.Reconciler{
Client: mgr.GetClient(),
UncachedReader: mgr.GetAPIReader(),
MaxConcurrentReconciles: int(math.Ceil(float64(opts.MaxFleetSizeSupported)/30) * math.Ceil(float64(opts.MaxConcurrentClusterPlacement)/10)),
InformerManager: dynamicInformerManager,
}).SetupWithManager(mgr); err != nil {
klog.ErrorS(err, "Unable to set up rollout controller")
return err
}
if opts.EnableEvictionAPIs {
for _, gvk := range evictionGVKs {
if err = utils.CheckCRDInstalled(discoverClient, gvk); err != nil {
klog.ErrorS(err, "Unable to find the required CRD", "GVK", gvk)
return err
}
}
klog.Info("Setting up cluster resource placement eviction controller")
if err := (&clusterresourceplacementeviction.Reconciler{
Client: mgr.GetClient(),
UncachedReader: mgr.GetAPIReader(),
}).SetupWithManager(mgr); err != nil {
klog.ErrorS(err, "Unable to set up cluster resource placement eviction controller")
return err
}
}
// Set up a controller to do staged update run, rolling out resources to clusters in a stage by stage manner.
if opts.EnableStagedUpdateRunAPIs {
for _, gvk := range clusterStagedUpdateRunGVKs {
if err = utils.CheckCRDInstalled(discoverClient, gvk); err != nil {
klog.ErrorS(err, "Unable to find the required CRD", "GVK", gvk)
return err
}
}
klog.Info("Setting up clusterStagedUpdateRun controller")
if err = (&updaterun.Reconciler{
Client: mgr.GetClient(),
InformerManager: dynamicInformerManager,
}).SetupWithManager(mgr); err != nil {
klog.ErrorS(err, "Unable to set up clusterStagedUpdateRun controller")
return err
}
}
// Set up the work generator
klog.Info("Setting up work generator")
if err := (&workgenerator.Reconciler{
Client: mgr.GetClient(),
MaxConcurrentReconciles: int(math.Ceil(float64(opts.MaxFleetSizeSupported)/10) * math.Ceil(float64(opts.MaxConcurrentClusterPlacement)/10)),
InformerManager: dynamicInformerManager,
}).SetupWithManager(mgr); err != nil {
klog.ErrorS(err, "Unable to set up work generator")
return err
}
// Set up the scheduler
klog.Info("Setting up scheduler")
defaultProfile := profile.NewDefaultProfile()
defaultFramework := framework.NewFramework(defaultProfile, mgr)
defaultSchedulingQueue := queue.NewSimpleClusterResourcePlacementSchedulingQueue(
queue.WithName(schedulerQueueName),
)
// we use one scheduler for every 10 concurrent placement
defaultScheduler := scheduler.NewScheduler("DefaultScheduler", defaultFramework, defaultSchedulingQueue, mgr,
int(math.Ceil(float64(opts.MaxFleetSizeSupported)/50)*math.Ceil(float64(opts.MaxConcurrentClusterPlacement)/10)))
klog.Info("Starting the scheduler")
// Scheduler must run in a separate goroutine as Run() is a blocking call.
wg.Add(1)
go func() {
defer wg.Done()
// Run() blocks and is set to exit on context cancellation.
defaultScheduler.Run(ctx)
klog.InfoS("The scheduler has exited")
}()
// Set up the watchers for the controller
klog.Info("Setting up the clusterResourcePlacement watcher for scheduler")
if err := (&schedulercrpwatcher.Reconciler{
Client: mgr.GetClient(),
SchedulerWorkQueue: defaultSchedulingQueue,
}).SetupWithManager(mgr); err != nil {
klog.ErrorS(err, "Unable to set up clusterResourcePlacement watcher for scheduler")
return err
}
klog.Info("Setting up the clusterSchedulingPolicySnapshot watcher for scheduler")
if err := (&schedulercspswatcher.Reconciler{
Client: mgr.GetClient(),
SchedulerWorkQueue: defaultSchedulingQueue,
}).SetupWithManager(mgr); err != nil {
klog.ErrorS(err, "Unable to set up clusterSchedulingPolicySnapshot watcher for scheduler")
return err
}
klog.Info("Setting up the clusterResourceBinding watcher for scheduler")
if err := (&schedulercrbwatcher.Reconciler{
Client: mgr.GetClient(),
SchedulerWorkQueue: defaultSchedulingQueue,
}).SetupWithManager(mgr); err != nil {
klog.ErrorS(err, "Unable to set up clusterResourceBinding watcher for scheduler")
return err
}
klog.Info("Setting up the memberCluster watcher for scheduler")
if err := (&membercluster.Reconciler{
Client: mgr.GetClient(),
SchedulerWorkQueue: defaultSchedulingQueue,
ClusterEligibilityChecker: clustereligibilitychecker.New(),
}).SetupWithManager(mgr); err != nil {
klog.ErrorS(err, "Unable to set up memberCluster watcher for scheduler")
return err
}
// Set up the controllers for overriding resources.
klog.Info("Setting up the clusterResourceOverride controller")
if err := (&overrider.ClusterResourceReconciler{
Reconciler: overrider.Reconciler{
Client: mgr.GetClient(),
},
}).SetupWithManager(mgr); err != nil {
klog.ErrorS(err, "Unable to set up clusterResourceOverride controller")
return err
}
klog.Info("Setting up the resourceOverride controller")
if err := (&overrider.ResourceReconciler{
Reconciler: overrider.Reconciler{
Client: mgr.GetClient(),
},
}).SetupWithManager(mgr); err != nil {
klog.ErrorS(err, "Unable to set up resourceOverride controller")
return err
}
// Verify cluster inventory CRD installation status.
if opts.EnableClusterInventoryAPIs {
for _, gvk := range clusterInventoryGVKs {
if err = utils.CheckCRDInstalled(discoverClient, gvk); err != nil {
klog.ErrorS(err, "unable to find the required CRD", "GVK", gvk)
return err
}
}
klog.Info("Setting up cluster profile controller")
if err = (&clusterprofile.Reconciler{
Client: mgr.GetClient(),
ClusterProfileNamespace: utils.FleetSystemNamespace,
ClusterUnhealthyThreshold: opts.ClusterUnhealthyThreshold.Duration,
}).SetupWithManager(mgr); err != nil {
klog.ErrorS(err, "unable to set up ClusterProfile controller")
return err
}
}
}
// Set up a new controller to reconcile any resources in the cluster
klog.Info("Setting up resource change controller")
rcr := &resourcechange.Reconciler{
DynamicClient: dynamicClient,
Recorder: mgr.GetEventRecorderFor(resourceChangeControllerName),
RestMapper: mgr.GetRESTMapper(),
InformerManager: dynamicInformerManager,
PlacementControllerV1Alpha1: clusterResourcePlacementControllerV1Alpha1,
PlacementControllerV1Beta1: clusterResourcePlacementControllerV1Beta1,
}
resourceChangeController := controller.NewController(resourceChangeControllerName, controller.ClusterWideKeyFunc, rcr.Reconcile, rateLimiter)
// Set up a runner that starts all the custom controllers we created above
resourceChangeDetector := &resourcewatcher.ChangeDetector{
DiscoveryClient: discoverClient,
RESTMapper: mgr.GetRESTMapper(),
ClusterResourcePlacementControllerV1Alpha1: clusterResourcePlacementControllerV1Alpha1,
ClusterResourcePlacementControllerV1Beta1: clusterResourcePlacementControllerV1Beta1,
ResourceChangeController: resourceChangeController,
MemberClusterPlacementController: memberClusterPlacementController,
InformerManager: dynamicInformerManager,
ResourceConfig: resourceConfig,
SkippedNamespaces: skippedNamespaces,
ConcurrentClusterPlacementWorker: int(math.Ceil(float64(opts.MaxConcurrentClusterPlacement) / 10)),
ConcurrentResourceChangeWorker: opts.ConcurrentResourceChangeSyncs,
}
if err := mgr.Add(resourceChangeDetector); err != nil {
klog.ErrorS(err, "Failed to setup resource detector")
return err
}
return nil
}