cmd/hub-net-controller-manager/main.go (221 lines of code) (raw):

/* Copyright (c) Microsoft Corporation. Licensed under the MIT license. */ // Binary hub-net-controller-manager watches fleet-networking CRDs in the hub cluster to export/import multi-cluster // services. package main import ( "flag" "fmt" "os" "time" // Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.) // to ensure that exec-entrypoint and run can make use of them. "github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/trafficmanager/armtrafficmanager" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/rand" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/discovery" clientgoscheme "k8s.io/client-go/kubernetes/scheme" _ "k8s.io/client-go/plugin/pkg/client/auth" "k8s.io/klog/v2" "sigs.k8s.io/cloud-provider-azure/pkg/azclient" "sigs.k8s.io/cloud-provider-azure/pkg/azclient/policy/ratelimit" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/healthz" "sigs.k8s.io/controller-runtime/pkg/log/zap" metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" "sigs.k8s.io/controller-runtime/pkg/webhook" //+kubebuilder:scaffold:imports clusterv1beta1 "go.goms.io/fleet/apis/cluster/v1beta1" "go.goms.io/fleet/pkg/utils" "go.goms.io/fleet/pkg/utils/cloudconfig/azure" fleetnetv1alpha1 "go.goms.io/fleet-networking/api/v1alpha1" fleetnetv1beta1 "go.goms.io/fleet-networking/api/v1beta1" "go.goms.io/fleet-networking/pkg/controllers/hub/endpointsliceexport" "go.goms.io/fleet-networking/pkg/controllers/hub/internalserviceexport" "go.goms.io/fleet-networking/pkg/controllers/hub/internalserviceimport" "go.goms.io/fleet-networking/pkg/controllers/hub/membercluster" "go.goms.io/fleet-networking/pkg/controllers/hub/serviceimport" "go.goms.io/fleet-networking/pkg/controllers/hub/trafficmanagerbackend" "go.goms.io/fleet-networking/pkg/controllers/hub/trafficmanagerprofile" ) var ( scheme = runtime.NewScheme() metricsAddr = flag.String("metrics-bind-address", ":8080", "The address the metric endpoint binds to.") probeAddr = flag.String("health-probe-bind-address", ":8081", "The address the probe endpoint binds to.") enableLeaderElection = flag.Bool("leader-elect", true, "Enable leader election for controller manager. Enabling this will ensure there is only one active controller manager.") leaderElectionNamespace = flag.String("leader-election-namespace", "fleet-system", "The namespace in which the leader election resource will be created.") internalServiceExportRetryInterval = flag.Duration("internalserviceexport-retry-interval", 2*time.Second, "The wait time for the internalserviceexport controller to requeue the request and to wait for the"+ "ServiceImport controller to resolve the service Spec") forceDeleteWaitTime = flag.Duration("force-delete-wait-time", 15*time.Minute, "The duration the fleet hub agent waits before trying to force delete a member cluster.") enableV1Beta1APIs = flag.Bool("enable-v1beta1-apis", true, "If set, the agents will watch for the v1beta1 APIs.") enableTrafficManagerFeature = flag.Bool("enable-traffic-manager-feature", true, "If set, the traffic manager feature will be enabled.") cloudConfigFile = flag.String("cloud-config", "/etc/kubernetes/provider/azure.json", "The path to the cloud config file which will be used to access the Azure resource.") ) var ( trafficManagerFeatureRequiredGVKs = []schema.GroupVersionKind{ fleetnetv1beta1.GroupVersion.WithKind(fleetnetv1beta1.TrafficManagerProfileKind), fleetnetv1beta1.GroupVersion.WithKind(fleetnetv1beta1.TrafficManagerBackendKind), } ) func init() { utilruntime.Must(clientgoscheme.AddToScheme(scheme)) utilruntime.Must(fleetnetv1alpha1.AddToScheme(scheme)) utilruntime.Must(fleetnetv1beta1.AddToScheme(scheme)) utilruntime.Must(clusterv1beta1.AddToScheme(scheme)) klog.InitFlags(nil) //+kubebuilder:scaffold:scheme } func main() { flag.Parse() rand.Seed(time.Now().UnixNano()) handleExitFunc := func() { klog.Flush() } exitWithErrorFunc := func() { handleExitFunc() os.Exit(1) } defer handleExitFunc() flag.VisitAll(func(f *flag.Flag) { klog.InfoS("flag:", "name", f.Name, "value", f.Value) }) // Set up controller-runtime logger ctrl.SetLogger(zap.New(zap.UseDevMode(true))) hubConfig := ctrl.GetConfigOrDie() mgr, err := ctrl.NewManager(hubConfig, ctrl.Options{ Scheme: scheme, Metrics: metricsserver.Options{ BindAddress: *metricsAddr, }, WebhookServer: webhook.NewServer(webhook.Options{ Port: 9443, }), HealthProbeBindAddress: *probeAddr, LeaderElection: *enableLeaderElection, LeaderElectionNamespace: *leaderElectionNamespace, LeaderElectionID: "2bf2b407.hub.networking.fleet.azure.com", }) if err != nil { klog.ErrorS(err, "Unable to start manager") exitWithErrorFunc() } //+kubebuilder:scaffold:builder if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil { klog.ErrorS(err, "Unable to set up health check") exitWithErrorFunc() } if err := mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil { klog.ErrorS(err, "Unable to set up ready check") exitWithErrorFunc() } ctx := ctrl.SetupSignalHandler() klog.V(1).InfoS("Start to setup EndpointsliceExport controller") if err := (&endpointsliceexport.Reconciler{ HubClient: mgr.GetClient(), }).SetupWithManager(ctx, mgr); err != nil { klog.ErrorS(err, "Unable to create EndpointsliceExport controller") exitWithErrorFunc() } klog.V(1).InfoS("Start to setup InternalServiceExport controller") if err := (&internalserviceexport.Reconciler{ Client: mgr.GetClient(), RetryInternal: *internalServiceExportRetryInterval, }).SetupWithManager(mgr); err != nil { klog.ErrorS(err, "Unable to create InternalServiceExport controller") exitWithErrorFunc() } klog.V(1).InfoS("Start to setup InternalServiceImport controller") if err := (&internalserviceimport.Reconciler{ HubClient: mgr.GetClient(), }).SetupWithManager(ctx, mgr); err != nil { klog.ErrorS(err, "Unable to create InternalServiceImport controller") exitWithErrorFunc() } klog.V(1).InfoS("Start to setup ServiceImport controller") if err := (&serviceimport.Reconciler{ Client: mgr.GetClient(), Recorder: mgr.GetEventRecorderFor(serviceimport.ControllerName), }).SetupWithManager(ctx, mgr); err != nil { klog.ErrorS(err, "Unable to create ServiceImport controller") exitWithErrorFunc() } discoverClient := discovery.NewDiscoveryClientForConfigOrDie(hubConfig) if *enableV1Beta1APIs { gvk := clusterv1beta1.GroupVersion.WithKind(clusterv1beta1.MemberClusterKind) if utils.CheckCRDInstalled(discoverClient, gvk) == nil { klog.V(1).InfoS("Start to setup MemberCluster controller") if err := (&membercluster.Reconciler{ Client: mgr.GetClient(), Recorder: mgr.GetEventRecorderFor(membercluster.ControllerName), ForceDeleteWaitTime: *forceDeleteWaitTime, }).SetupWithManager(mgr); err != nil { klog.ErrorS(err, "Unable to create MemberCluster controller") exitWithErrorFunc() } } } if *enableTrafficManagerFeature { klog.V(1).InfoS("Traffic manager feature is enabled, checking the required CRDs") for _, gvk := range trafficManagerFeatureRequiredGVKs { if err = utils.CheckCRDInstalled(discoverClient, gvk); err != nil { klog.ErrorS(err, "Unable to find the required CRD", "GVK", gvk) exitWithErrorFunc() } } klog.V(1).InfoS("Traffic manager feature is enabled, loading cloud config and creating azure clients", "cloudConfigFile", *cloudConfigFile) cloudConfig, err := azure.NewCloudConfigFromFile(*cloudConfigFile) if err != nil { klog.ErrorS(err, "Unable to load cloud config", "file name", *cloudConfigFile) exitWithErrorFunc() } cloudConfig.SetUserAgent("fleet-hub-net-controller-manager") klog.V(1).InfoS("Cloud config loaded", "cloudConfig", cloudConfig) profilesClient, endpointsClient, err := initAzureTrafficManagerClients(cloudConfig) if err != nil { klog.ErrorS(err, "Unable to create Azure Traffic Manager clients") exitWithErrorFunc() } klog.V(1).InfoS("Start to setup TrafficManagerProfile controller") if err := (&trafficmanagerprofile.Reconciler{ Client: mgr.GetClient(), ProfilesClient: profilesClient, }).SetupWithManager(mgr); err != nil { klog.ErrorS(err, "Unable to create TrafficManagerProfile controller") exitWithErrorFunc() } klog.V(1).InfoS("Start to setup TrafficManagerBackend controller") if err := (&trafficmanagerbackend.Reconciler{ Client: mgr.GetClient(), ProfilesClient: profilesClient, EndpointsClient: endpointsClient, // serviceImport controller has already enabled the internalServiceExportIndexer. // Therefore, no need to setup it again. }).SetupWithManager(ctx, mgr, true); err != nil { klog.ErrorS(err, "Unable to create TrafficManagerProfile controller") exitWithErrorFunc() } } klog.V(1).InfoS("Starting ServiceExportImport controller manager") if err := mgr.Start(ctx); err != nil { klog.ErrorS(err, "Problem running manager") exitWithErrorFunc() } } // initAzureTrafficManagerClients initializes the Azure Traffic Manager profiles and endpoints clients. func initAzureTrafficManagerClients(cloudConfig *azure.CloudConfig) (*armtrafficmanager.ProfilesClient, *armtrafficmanager.EndpointsClient, error) { authProvider, err := azclient.NewAuthProvider(&cloudConfig.ARMClientConfig, &cloudConfig.AzureAuthConfig) if err != nil { return nil, nil, fmt.Errorf("failed to create Azure auth provider: %w", err) } factoryConfig := &azclient.ClientFactoryConfig{ CloudProviderBackoff: true, SubscriptionID: cloudConfig.SubscriptionID, } options, err := azclient.GetDefaultResourceClientOption(&cloudConfig.ARMClientConfig, factoryConfig) if err != nil { return nil, nil, fmt.Errorf("failed to get default resource client option: %w", err) } if rateLimitPolicy := ratelimit.NewRateLimitPolicy(cloudConfig.Config); rateLimitPolicy != nil { options.ClientOptions.PerCallPolicies = append(options.ClientOptions.PerCallPolicies, rateLimitPolicy) } profilesClient, err := armtrafficmanager.NewProfilesClient(cloudConfig.SubscriptionID, authProvider.GetAzIdentity(), options) if err != nil { return nil, nil, fmt.Errorf("failed to create Azure trafficManager profiles client: %w", err) } endpointsClient, err := armtrafficmanager.NewEndpointsClient(cloudConfig.SubscriptionID, authProvider.GetAzIdentity(), options) if err != nil { return nil, nil, fmt.Errorf("failed to create Azure trafficManager endpoints client: %w", err) } return profilesClient, endpointsClient, nil }