cmd/member-net-controller-manager/main.go (354 lines of code) (raw):

/* Copyright (c) Microsoft Corporation. Licensed under the MIT license. */ // Binary member-net-controller-manager watches fleet-networking CRDs in the member cluster to export/import multi-cluster // services. package main import ( "context" "flag" "fmt" "os" "os/signal" "sync" "syscall" "time" // Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.) // to ensure that exec-entrypoint and run can make use of them. "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/rand" utilruntime "k8s.io/apimachinery/pkg/util/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" _ "k8s.io/client-go/plugin/pkg/client/auth" "k8s.io/client-go/rest" "k8s.io/klog/v2" "sigs.k8s.io/cloud-provider-azure/pkg/azclient" "sigs.k8s.io/cloud-provider-azure/pkg/azclient/policy/ratelimit" "sigs.k8s.io/cloud-provider-azure/pkg/azclient/publicipaddressclient" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/healthz" "sigs.k8s.io/controller-runtime/pkg/log/zap" "sigs.k8s.io/controller-runtime/pkg/manager" metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" "sigs.k8s.io/controller-runtime/pkg/webhook" //+kubebuilder:scaffold:imports clusterv1beta1 "go.goms.io/fleet/apis/cluster/v1beta1" fleetv1alpha1 "go.goms.io/fleet/apis/v1alpha1" "go.goms.io/fleet/pkg/utils/cloudconfig/azure" fleetnetv1alpha1 "go.goms.io/fleet-networking/api/v1alpha1" "go.goms.io/fleet-networking/pkg/common/env" "go.goms.io/fleet-networking/pkg/common/hubconfig" "go.goms.io/fleet-networking/pkg/controllers/member/endpointslice" "go.goms.io/fleet-networking/pkg/controllers/member/endpointsliceexport" "go.goms.io/fleet-networking/pkg/controllers/member/endpointsliceimport" imcv1alpha1 "go.goms.io/fleet-networking/pkg/controllers/member/internalmembercluster/v1alpha1" imcv1beta1 "go.goms.io/fleet-networking/pkg/controllers/member/internalmembercluster/v1beta1" "go.goms.io/fleet-networking/pkg/controllers/member/internalserviceexport" "go.goms.io/fleet-networking/pkg/controllers/member/internalserviceimport" "go.goms.io/fleet-networking/pkg/controllers/member/serviceexport" "go.goms.io/fleet-networking/pkg/controllers/member/serviceimport" ) var ( scheme = runtime.NewScheme() hubMetricsAddr = flag.String("hub-metrics-bind-address", ":8080", "The address of hub controller manager the metric endpoint binds to.") hubProbeAddr = flag.String("hub-health-probe-bind-address", ":8081", "The address of hub controller manager the probe endpoint binds to.") metricsAddr = flag.String("member-metrics-bind-address", ":8090", "The address of member controller manager the metric endpoint binds to.") probeAddr = flag.String("member-health-probe-bind-address", ":8091", "The address of member controller manager the probe endpoint binds to.") enableLeaderElection = flag.Bool("leader-elect", true, "Enable leader election for controller manager. Enabling this will ensure there is only one active controller manager.") leaderElectionNamespace = flag.String("leader-election-namespace", "fleet-system", "The namespace in which the leader election resource will be created.") tlsClientInsecure = flag.Bool("tls-insecure", false, "Enable TLSClientConfig.Insecure property. Enabling this will make the connection inSecure (should be 'true' for testing purpose only.)") fleetSystemNamespace = flag.String("fleet-system-namespace", "fleet-system", "The reserved system namespace used by fleet.") isV1Alpha1APIEnabled = flag.Bool("enable-v1alpha1-apis", true, "If set, the agents will watch for the v1alpha1 APIs.") isV1Beta1APIEnabled = flag.Bool("enable-v1beta1-apis", false, "If set, the agents will watch for the v1beta1 APIs.") enableTrafficManagerFeature = flag.Bool("enable-traffic-manager-feature", true, "If set, the traffic manager feature will be enabled.") cloudConfigFile = flag.String("cloud-config", "/etc/kubernetes/provider/azure.json", "The path to the cloud config file which will be used to access the Azure resource.") ) func init() { klog.InitFlags(nil) utilruntime.Must(clientgoscheme.AddToScheme(scheme)) utilruntime.Must(fleetnetv1alpha1.AddToScheme(scheme)) utilruntime.Must(fleetv1alpha1.AddToScheme(scheme)) utilruntime.Must(clusterv1beta1.AddToScheme(scheme)) //+kubebuilder:scaffold:scheme } func main() { flag.Parse() rand.Seed(time.Now().UnixNano()) handleExitFunc := func() { klog.Flush() } exitWithErrorFunc := func() { handleExitFunc() os.Exit(1) } defer handleExitFunc() flag.VisitAll(func(f *flag.Flag) { klog.InfoS("flag:", "name", f.Name, "value", f.Value) }) // Set up controller-runtime logger ctrl.SetLogger(zap.New(zap.UseDevMode(true))) memberConfig, memberOptions := prepareMemberParameters() hubConfig, hubOptions, err := prepareHubParameters(memberConfig) if err != nil { exitWithErrorFunc() } // Setup hub controller manager. hubMgr, err := ctrl.NewManager(hubConfig, *hubOptions) if err != nil { klog.ErrorS(err, "Unable to start hub manager") exitWithErrorFunc() } if err := hubMgr.AddHealthzCheck("healthz", healthz.Ping); err != nil { klog.ErrorS(err, "Unable to set up health check for hub manager") exitWithErrorFunc() } if err := hubMgr.AddReadyzCheck("readyz", healthz.Ping); err != nil { klog.ErrorS(err, "Unable to set up ready check for hub manager") exitWithErrorFunc() } // Setup member controller manager. memberMgr, err := ctrl.NewManager(memberConfig, *memberOptions) if err != nil { klog.ErrorS(err, "Unable to start member manager") exitWithErrorFunc() } if err := memberMgr.AddHealthzCheck("healthz", healthz.Ping); err != nil { klog.ErrorS(err, "Unable to set up health check for member manager") exitWithErrorFunc() } if err := memberMgr.AddReadyzCheck("readyz", healthz.Ping); err != nil { klog.ErrorS(err, "Unable to set up ready check for member manager") exitWithErrorFunc() } ctx, cancel := context.WithCancel(context.Background()) klog.V(1).InfoS("Setup controllers with controller manager") if err := setupControllersWithManager(ctx, hubMgr, memberMgr); err != nil { klog.ErrorS(err, "Unable to setup controllers with manager") exitWithErrorFunc() } // All managers should stop if either of them is dead or Linux SIGTERM or SIGINT signal is received ch := make(chan os.Signal, 1) signal.Notify(ch, os.Interrupt, syscall.SIGTERM) go func() { <-ch klog.Info("Received termination, signaling shutdown ServiceExportImport agent") cancel() }() var startErrors []error wg := &sync.WaitGroup{} wg.Add(1) go func() { klog.V(1).InfoS("Starting hub manager for ServiceExportImport agent") defer func() { wg.Done() klog.V(1).InfoS("Shutting down hub manager") cancel() }() if err := hubMgr.Start(ctx); err != nil { klog.ErrorS(err, "Failed to start hub manager") startErrors = append(startErrors, err) } }() wg.Add(1) go func() { klog.V(1).InfoS("Starting member manager for ServiceExportImport agent") defer func() { klog.V(1).InfoS("Shutting down member manager") wg.Done() cancel() }() if err = memberMgr.Start(ctx); err != nil { klog.ErrorS(err, "Failed to start member manager") startErrors = append(startErrors, err) } }() wg.Wait() if len(startErrors) > 0 { exitWithErrorFunc() } } func prepareHubParameters(memberConfig *rest.Config) (*rest.Config, *ctrl.Options, error) { hubConfig, err := hubconfig.PrepareHubConfig(*tlsClientInsecure) if err != nil { klog.ErrorS(err, "Failed to get hub config") return nil, nil, err } mcHubNamespace, err := hubconfig.FetchMemberClusterNamespace() if err != nil { klog.ErrorS(err, "Failed to get member cluster hub namespace") return nil, nil, err } hubOptions := &ctrl.Options{ Scheme: scheme, Metrics: metricsserver.Options{ BindAddress: *hubMetricsAddr, }, WebhookServer: webhook.NewServer(webhook.Options{ Port: 9443, }), HealthProbeBindAddress: *hubProbeAddr, LeaderElection: *enableLeaderElection, LeaderElectionID: "2bf2b407.hub.networking.fleet.azure.com", LeaderElectionNamespace: *leaderElectionNamespace, // This requires we have access to resource "leases" in API group "coordination.k8s.io" under leaderElectionNamespace. LeaderElectionConfig: memberConfig, // Restricts the manager's cache to watch objects in the member hub namespace. Cache: cache.Options{ DefaultNamespaces: map[string]cache.Config{ mcHubNamespace: {}, }, }, } return hubConfig, hubOptions, nil } func prepareMemberParameters() (*rest.Config, *ctrl.Options) { memberOpts := &ctrl.Options{ Scheme: scheme, Metrics: metricsserver.Options{ BindAddress: *metricsAddr, }, WebhookServer: webhook.NewServer(webhook.Options{ Port: 8443, }), HealthProbeBindAddress: *probeAddr, LeaderElection: *enableLeaderElection, LeaderElectionNamespace: *leaderElectionNamespace, LeaderElectionID: "2bf2b407.member.networking.fleet.azure.com", } return ctrl.GetConfigOrDie(), memberOpts } func setupControllersWithManager(ctx context.Context, hubMgr, memberMgr manager.Manager) error { klog.V(1).InfoS("Begin to setup controllers with controller manager") mcName, err := env.LookupMemberClusterName() if err != nil { klog.ErrorS(err, "Member cluster name cannot be empty") return err } mcHubNamespace, err := hubconfig.FetchMemberClusterNamespace() if err != nil { klog.ErrorS(err, "Failed to get member cluster hub namespace") return err } memberClient := memberMgr.GetClient() hubClient := hubMgr.GetClient() klog.V(1).InfoS("Create endpointslice controller") if err := (&endpointslice.Reconciler{ MemberClusterID: mcName, MemberClient: memberClient, HubClient: hubClient, HubNamespace: mcHubNamespace, }).SetupWithManager(ctx, memberMgr); err != nil { klog.ErrorS(err, "Unable to create endpointslice controller") return err } klog.V(1).InfoS("Create endpointsliceexport controller") if err := (&endpointsliceexport.Reconciler{ MemberClient: memberClient, HubClient: hubClient, }).SetupWithManager(hubMgr); err != nil { klog.ErrorS(err, "Unable to create endpointsliceexport controller") return err } klog.V(1).InfoS("Create endpointsliceimport controller") if err := (&endpointsliceimport.Reconciler{ MemberClusterID: mcName, MemberClient: memberClient, HubClient: hubClient, FleetSystemNamespace: *fleetSystemNamespace, }).SetupWithManager(ctx, memberMgr, hubMgr); err != nil { klog.ErrorS(err, "Unable to create endpointsliceimport controller") return err } klog.V(1).InfoS("Create internalserviceexport controller") if err := (&internalserviceexport.Reconciler{ MemberClusterID: mcName, MemberClient: memberClient, HubClient: hubClient, Recorder: memberMgr.GetEventRecorderFor(internalserviceexport.ControllerName), }).SetupWithManager(hubMgr); err != nil { klog.ErrorS(err, "Unable to create internalserviceexport controller") return err } klog.V(1).InfoS("Create internalserviceimport controller") if err := (&internalserviceimport.Reconciler{ MemberClient: memberClient, HubClient: hubClient, }).SetupWithManager(hubMgr); err != nil { klog.ErrorS(err, "Unable to create internalserviceimport controller") return err } var azurePublicIPAddressClient publicipaddressclient.Interface var resourceGroupName string if *enableTrafficManagerFeature { klog.V(1).InfoS("Traffic manager feature is enabled, loading cloud config and creating azure clients", "cloudConfigFile", *cloudConfigFile) cloudConfig, err := azure.NewCloudConfigFromFile(*cloudConfigFile) if err != nil { klog.ErrorS(err, "Unable to load cloud config", "file name", *cloudConfigFile) return err } cloudConfig.SetUserAgent("fleet-member-net-controller-manager") klog.V(1).InfoS("Cloud config loaded", "cloudConfig", cloudConfig) azurePublicIPAddressClient, err = initAzureNetworkClients(cloudConfig) if err != nil { klog.ErrorS(err, "Unable to create Azure Traffic Manager clients") return err } resourceGroupName = cloudConfig.ResourceGroup } klog.V(1).InfoS("Create serviceexport reconciler", "enableTrafficManagerFeature", *enableTrafficManagerFeature) if err := (&serviceexport.Reconciler{ MemberClient: memberClient, HubClient: hubClient, MemberClusterID: mcName, HubNamespace: mcHubNamespace, Recorder: memberMgr.GetEventRecorderFor(serviceexport.ControllerName), EnableTrafficManagerFeature: *enableTrafficManagerFeature, ResourceGroupName: resourceGroupName, AzurePublicIPAddressClient: azurePublicIPAddressClient, }).SetupWithManager(memberMgr); err != nil { klog.ErrorS(err, "Unable to create serviceexport reconciler") return err } klog.V(1).InfoS("Create serviceimport reconciler") if err := (&serviceimport.Reconciler{ MemberClient: memberClient, HubClient: hubClient, MemberClusterID: mcName, HubNamespace: mcHubNamespace, }).SetupWithManager(memberMgr); err != nil { klog.ErrorS(err, "Unable to create serviceimport reconciler") return err } if *isV1Alpha1APIEnabled { klog.V(1).InfoS("Create internalmembercluster (v1alpha1 API) reconciler") if err := (&imcv1alpha1.Reconciler{ MemberClient: memberClient, HubClient: hubClient, AgentType: fleetv1alpha1.ServiceExportImportAgent, }).SetupWithManager(hubMgr); err != nil { klog.ErrorS(err, "Unable to create internalmembercluster (v1alpha1 API) reconciler") return err } } if *isV1Beta1APIEnabled { klog.V(1).InfoS("Create internalmembercluster (v1beta1 API) reconciler") if err := (&imcv1beta1.Reconciler{ MemberClient: memberClient, HubClient: hubClient, AgentType: clusterv1beta1.ServiceExportImportAgent, }).SetupWithManager(hubMgr); err != nil { klog.ErrorS(err, "Unable to create internalmembercluster (v1beta1 API) reconciler") return err } } klog.V(1).InfoS("Succeeded to setup controllers with controller manager") return nil } // initAzureNetworkClients initializes the Azure network resource clients, currently only publicIPAddressClient. func initAzureNetworkClients(cloudConfig *azure.CloudConfig) (publicipaddressclient.Interface, error) { authProvider, err := azclient.NewAuthProvider(&cloudConfig.ARMClientConfig, &cloudConfig.AzureAuthConfig) if err != nil { return nil, fmt.Errorf("failed to create Azure auth provider: %w", err) } factoryConfig := &azclient.ClientFactoryConfig{ CloudProviderBackoff: true, SubscriptionID: cloudConfig.SubscriptionID, } options, err := azclient.GetDefaultResourceClientOption(&cloudConfig.ARMClientConfig, factoryConfig) if err != nil { return nil, fmt.Errorf("failed to get default resource client option: %w", err) } if rateLimitPolicy := ratelimit.NewRateLimitPolicy(cloudConfig.Config); rateLimitPolicy != nil { options.ClientOptions.PerCallPolicies = append(options.ClientOptions.PerCallPolicies, rateLimitPolicy) } pipClient, err := publicipaddressclient.New(cloudConfig.SubscriptionID, authProvider.GetAzIdentity(), options) if err != nil { return nil, fmt.Errorf("failed to create Azure PublicIPAddress client: %w", err) } return pipClient, nil }