pkg/datasource/k8s/k8s.go (181 lines of code) (raw):

package k8s import ( "strings" "github.com/alibaba/sentinel-golang/logging" crdv1alpha1 "github.com/alibaba/sentinel-golang/pkg/datasource/k8s/api/v1alpha1" "github.com/alibaba/sentinel-golang/pkg/datasource/k8s/controllers" "github.com/alibaba/sentinel-golang/util" "github.com/pkg/errors" "k8s.io/apimachinery/pkg/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/reconcile" // +kubebuilder:scaffold:imports ) var ( scheme = runtime.NewScheme() setupLog = ctrl.Log.WithName("setup") ) func init() { _ = clientgoscheme.AddToScheme(scheme) _ = crdv1alpha1.AddToScheme(scheme) // +kubebuilder:scaffold:scheme } type CRDType int32 const ( FlowRulesCRD CRDType = iota IsolationRulesCRD CircuitBreakerRulesCRD HotspotRulesCRD SystemRulesCRD ) func (c CRDType) String() string { switch c { case FlowRulesCRD: return "FlowRulesCRD" case CircuitBreakerRulesCRD: return "CircuitBreakerRulesCRD" case HotspotRulesCRD: return "HotspotRulesCRD" case SystemRulesCRD: return "SystemRulesCRD" default: return "Undefined" } } type DataSource struct { crdManager ctrl.Manager controllers map[CRDType]reconcile.Reconciler namespace string stopChan chan struct{} } // NewDataSource creates a K8S DataSource with given namespace // All Controllers take effective only when match namespace. func NewDataSource(namespace string) (*DataSource, error) { ctrl.SetLogger(&k8SLogger{ l: logging.GetGlobalLogger(), level: logging.GetGlobalLoggerLevel(), names: make([]string, 0), keysAndValues: make([]interface{}, 0), }) k8sConfig, err := ctrl.GetConfig() if err != nil { return nil, err } mgr, err := ctrl.NewManager(k8sConfig, ctrl.Options{ Scheme: scheme, // disable metric server MetricsBindAddress: "0", HealthProbeBindAddress: "0", LeaderElection: false, }) if err != nil { setupLog.Error(err, "unable to start manager") return nil, err } k := &DataSource{ crdManager: mgr, controllers: make(map[CRDType]reconcile.Reconciler, 4), namespace: namespace, stopChan: make(chan struct{}), } return k, nil } // RegisterController register given type crd and crd name // For each type CRD can only register once. func (k *DataSource) RegisterController(crd CRDType, crName string) error { if len(strings.TrimSpace(crName)) == 0 { return errors.New("empty crd name") } _, exist := k.controllers[crd] if exist { return errors.Errorf("duplicated register crd for %s", crd.String()) } switch crd { case FlowRulesCRD: controller := &controllers.FlowRulesReconciler{ Client: k.crdManager.GetClient(), Logger: ctrl.Log.WithName("controllers").WithName("FlowRules"), Scheme: k.crdManager.GetScheme(), Namespace: k.namespace, ExpectedCrName: crName, } err := controller.SetupWithManager(k.crdManager) if err != nil { return err } k.controllers[FlowRulesCRD] = controller setupLog.Info("succeed to register FlowRulesCRD Controller.") return nil case IsolationRulesCRD: controller := &controllers.IsolationRulesReconciler{ Client: k.crdManager.GetClient(), Logger: ctrl.Log.WithName("controllers").WithName("IsolationRules"), Scheme: k.crdManager.GetScheme(), Namespace: k.namespace, ExpectedCrName: crName, } err := controller.SetupWithManager(k.crdManager) if err != nil { return err } k.controllers[IsolationRulesCRD] = controller setupLog.Info("succeed to register IsolationRulesCRD Controller.") return nil case CircuitBreakerRulesCRD: controller := &controllers.CircuitBreakerRulesReconciler{ Client: k.crdManager.GetClient(), Logger: ctrl.Log.WithName("controllers").WithName("CircuitBreakerRules"), Scheme: k.crdManager.GetScheme(), Namespace: k.namespace, ExpectedCrName: crName, } err := controller.SetupWithManager(k.crdManager) if err != nil { return err } k.controllers[CircuitBreakerRulesCRD] = controller setupLog.Info("succeed to register CircuitBreakerRulesCRD Controller.") return nil case HotspotRulesCRD: controller := &controllers.HotspotRulesReconciler{ Client: k.crdManager.GetClient(), Logger: ctrl.Log.WithName("controllers").WithName("HotspotRules"), Scheme: k.crdManager.GetScheme(), Namespace: k.namespace, ExpectedCrName: crName, } err := controller.SetupWithManager(k.crdManager) if err != nil { return err } k.controllers[HotspotRulesCRD] = controller setupLog.Info("succeed to register HotspotRulesCRD Controller.") return nil case SystemRulesCRD: controller := &controllers.SystemRulesReconciler{ Client: k.crdManager.GetClient(), Logger: ctrl.Log.WithName("controllers").WithName("SystemRules"), Scheme: k.crdManager.GetScheme(), Namespace: k.namespace, ExpectedCrName: crName, } err := controller.SetupWithManager(k.crdManager) if err != nil { return err } k.controllers[SystemRulesCRD] = controller setupLog.Info("succeed to register SystemRulesCRD Controller.") return nil default: return errors.Errorf("unsupported CRDType: %d", int(crd)) } } // Close exit the K8S DataSource func (k *DataSource) Close() error { k.stopChan <- struct{}{} return nil } // Run runs the k8s DataSource func (k *DataSource) Run() error { // +kubebuilder:scaffold:builder go util.RunWithRecover(func() { setupLog.Info("starting manager") if err := k.crdManager.Start(k.stopChan); err != nil { setupLog.Error(err, "problem running manager") } setupLog.Info("k8s datasource exited") }) return nil }