cmd/mcs-controller-manager/main.go (231 lines of code) (raw):
/*
Copyright (c) Microsoft Corporation.
Licensed under the MIT license.
*/
// Binary mcs-controller-manager watches multiclusterservice CRD to expose the multi-cluster service via load balancer.
// The controller could be installed in either hub cluster or member clusters.
package main
import (
"context"
"flag"
"os"
"os/signal"
"sync"
"syscall"
"time"
"k8s.io/apimachinery/pkg/util/rand"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
// to ensure that exec-entrypoint and run can make use of them.
_ "k8s.io/client-go/plugin/pkg/client/auth"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/manager"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
"sigs.k8s.io/controller-runtime/pkg/webhook"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/healthz"
//+kubebuilder:scaffold:imports
clusterv1beta1 "go.goms.io/fleet/apis/cluster/v1beta1"
fleetv1alpha1 "go.goms.io/fleet/apis/v1alpha1"
fleetnetv1alpha1 "go.goms.io/fleet-networking/api/v1alpha1"
"go.goms.io/fleet-networking/pkg/common/hubconfig"
imcv1alpha1 "go.goms.io/fleet-networking/pkg/controllers/member/internalmembercluster/v1alpha1"
imcv1beta1 "go.goms.io/fleet-networking/pkg/controllers/member/internalmembercluster/v1beta1"
"go.goms.io/fleet-networking/pkg/controllers/multiclusterservice"
)
var (
scheme = runtime.NewScheme()
hubMetricsAddr = flag.String("hub-metrics-bind-address", ":8080", "The address of hub controller manager the metric endpoint binds to.")
hubProbeAddr = flag.String("hub-health-probe-bind-address", ":8081", "The address of hub controller manager the probe endpoint binds to.")
metricsAddr = flag.String("metrics-bind-address", ":8090", "The address the metric endpoint binds to.")
probeAddr = flag.String("health-probe-bind-address", ":8091", "The address the probe endpoint binds to.")
enableLeaderElection = flag.Bool("leader-elect", true,
"Enable leader election for controller manager. Enabling this will ensure there is only one active controller manager.")
leaderElectionNamespace = flag.String("leader-election-namespace", "fleet-system", "The namespace in which the leader election resource will be created.")
tlsClientInsecure = flag.Bool("tls-insecure", false, "Enable TLSClientConfig.Insecure property. Enabling this will make the connection inSecure (should be 'true' for testing purpose only.)")
fleetSystemNamespace = flag.String("fleet-system-namespace", "fleet-system", "The reserved system namespace used by fleet.")
isV1Alpha1APIEnabled = flag.Bool("enable-v1alpha1-apis", true, "If set, the agents will watch for the v1alpha1 APIs.")
isV1Beta1APIEnabled = flag.Bool("enable-v1beta1-apis", false, "If set, the agents will watch for the v1beta1 APIs.")
)
func init() {
klog.InitFlags(nil)
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
utilruntime.Must(fleetnetv1alpha1.AddToScheme(scheme))
utilruntime.Must(fleetv1alpha1.AddToScheme(scheme))
utilruntime.Must(clusterv1beta1.AddToScheme(scheme))
//+kubebuilder:scaffold:scheme
}
func main() {
flag.Parse()
rand.Seed(time.Now().UnixNano())
handleExitFunc := func() {
klog.Flush()
}
exitWithErrorFunc := func() {
handleExitFunc()
os.Exit(1)
}
defer handleExitFunc()
flag.VisitAll(func(f *flag.Flag) {
klog.InfoS("flag:", "name", f.Name, "value", f.Value)
})
// Set up controller-runtime logger
ctrl.SetLogger(zap.New(zap.UseDevMode(true)))
memberConfig, memberOptions := prepareMemberParameters()
hubConfig, hubOptions, err := prepareHubParameters(memberConfig)
if err != nil {
exitWithErrorFunc()
}
// Setup hub controller manager.
hubMgr, err := ctrl.NewManager(hubConfig, *hubOptions)
if err != nil {
klog.ErrorS(err, "Unable to start hub manager")
exitWithErrorFunc()
}
if err := hubMgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
klog.ErrorS(err, "Unable to set up health check for hub manager")
exitWithErrorFunc()
}
if err := hubMgr.AddReadyzCheck("readyz", healthz.Ping); err != nil {
klog.ErrorS(err, "Unable to set up ready check for hub manager")
exitWithErrorFunc()
}
// Setup member controller manager.
memberMgr, err := ctrl.NewManager(memberConfig, *memberOptions)
if err != nil {
klog.ErrorS(err, "Unable to start member manager")
exitWithErrorFunc()
}
if err := memberMgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
klog.ErrorS(err, "Unable to set up health check for member manager")
exitWithErrorFunc()
}
if err := memberMgr.AddReadyzCheck("readyz", healthz.Ping); err != nil {
klog.ErrorS(err, "Unable to set up ready check for member manager")
exitWithErrorFunc()
}
ctx, cancel := context.WithCancel(context.Background())
klog.V(1).InfoS("Setup controllers with controller manager")
if err := setupControllersWithManager(ctx, hubMgr, memberMgr); err != nil {
klog.ErrorS(err, "Unable to setup controllers with manager")
exitWithErrorFunc()
}
// All managers should stop if either of them is dead or Linux SIGTERM or SIGINT signal is received
ch := make(chan os.Signal, 1)
signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
go func() {
<-ch
klog.Info("Received termination, signaling shutdown MultiClusterService agent")
cancel()
}()
var startErrors []error
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
klog.V(1).InfoS("Starting hub manager for MultiClusterService agent")
defer func() {
wg.Done()
klog.V(1).InfoS("Shutting down hub manager")
cancel()
}()
if err := hubMgr.Start(ctx); err != nil {
klog.ErrorS(err, "Failed to start hub manager")
startErrors = append(startErrors, err)
}
}()
wg.Add(1)
go func() {
klog.V(1).InfoS("Starting member manager for MultiClusterService agent")
defer func() {
klog.V(1).InfoS("Shutting down member manager")
wg.Done()
cancel()
}()
if err = memberMgr.Start(ctx); err != nil {
klog.ErrorS(err, "Failed to start member manager")
startErrors = append(startErrors, err)
}
}()
wg.Wait()
if len(startErrors) > 0 {
exitWithErrorFunc()
}
}
func prepareHubParameters(memberConfig *rest.Config) (*rest.Config, *ctrl.Options, error) {
hubConfig, err := hubconfig.PrepareHubConfig(*tlsClientInsecure)
if err != nil {
klog.ErrorS(err, "Failed to get hub config")
return nil, nil, err
}
mcHubNamespace, err := hubconfig.FetchMemberClusterNamespace()
if err != nil {
klog.ErrorS(err, "Failed to get member cluster hub namespace")
return nil, nil, err
}
hubOptions := &ctrl.Options{
Scheme: scheme,
Metrics: metricsserver.Options{
BindAddress: *hubMetricsAddr,
},
WebhookServer: webhook.NewServer(webhook.Options{
Port: 9443,
}),
HealthProbeBindAddress: *hubProbeAddr,
LeaderElection: *enableLeaderElection,
LeaderElectionID: "2bf2b407.mcs.hub.networking.fleet.azure.com",
LeaderElectionNamespace: *leaderElectionNamespace, // This requires we have access to resource "leases" in API group "coordination.k8s.io" under leaderElectionNamespace.
LeaderElectionConfig: memberConfig,
// Restricts the manager's cache to watch objects in the member hub namespace.
Cache: cache.Options{
DefaultNamespaces: map[string]cache.Config{
mcHubNamespace: {},
},
},
}
return hubConfig, hubOptions, nil
}
func prepareMemberParameters() (*rest.Config, *ctrl.Options) {
memberOpts := &ctrl.Options{
Scheme: scheme,
Metrics: metricsserver.Options{
BindAddress: *metricsAddr,
},
WebhookServer: webhook.NewServer(webhook.Options{
Port: 8443,
}),
HealthProbeBindAddress: *probeAddr,
LeaderElection: *enableLeaderElection,
LeaderElectionNamespace: *leaderElectionNamespace,
LeaderElectionID: "2bf2b407.mcs.member.networking.fleet.azure.com",
}
return ctrl.GetConfigOrDie(), memberOpts
}
func setupControllersWithManager(_ context.Context, hubMgr, memberMgr manager.Manager) error {
klog.V(1).InfoS("Begin to setup controllers with controller manager")
memberClient := memberMgr.GetClient()
hubClient := hubMgr.GetClient()
klog.V(1).InfoS("Create multiclusterservice reconciler")
if err := (&multiclusterservice.Reconciler{
Client: memberClient,
Scheme: memberMgr.GetScheme(),
FleetSystemNamespace: *fleetSystemNamespace,
Recorder: memberMgr.GetEventRecorderFor(multiclusterservice.ControllerName),
}).SetupWithManager(memberMgr); err != nil {
klog.ErrorS(err, "Unable to create multiclusterservice reconciler")
return err
}
if *isV1Alpha1APIEnabled {
klog.V(1).InfoS("Create internalmembercluster (v1alpha1 API) reconciler")
if err := (&imcv1alpha1.Reconciler{
MemberClient: memberClient,
HubClient: hubClient,
AgentType: fleetv1alpha1.MultiClusterServiceAgent,
}).SetupWithManager(hubMgr); err != nil {
klog.ErrorS(err, "Unable to create internalmembercluster (v1alpha1 API) reconciler")
return err
}
}
if *isV1Beta1APIEnabled {
klog.V(1).InfoS("Create internalmembercluster (v1beta1 API) reconciler")
if err := (&imcv1beta1.Reconciler{
MemberClient: memberClient,
HubClient: hubClient,
AgentType: clusterv1beta1.MultiClusterServiceAgent,
}).SetupWithManager(hubMgr); err != nil {
klog.ErrorS(err, "Unable to create internalmembercluster (v1beta1 API) reconciler")
return err
}
}
klog.V(1).InfoS("Succeeded to setup controllers with controller manager")
return nil
}