pkg/controller/service/controller.go (94 lines of code) (raw):

package service import ( "context" "io" "sync" "time" "github.com/alibaba/kubeskoop/pkg/controller/db" exporter "github.com/alibaba/kubeskoop/pkg/exporter/cmd" lokiwrapper "github.com/alibaba/kubeskoop/pkg/exporter/loki" "github.com/alibaba/kubeskoop/pkg/controller/diagnose" "github.com/alibaba/kubeskoop/pkg/controller/rpc" skoopContext "github.com/alibaba/kubeskoop/pkg/skoop/context" "github.com/prometheus/client_golang/api" promv1 "github.com/prometheus/client_golang/api/prometheus/v1" "github.com/prometheus/common/model" "k8s.io/client-go/kubernetes" ) const ( Namespace = "kubeskoop" ExporterConfigMap = "kubeskoop-config" ) type ControllerService interface { rpc.ControllerRegisterServiceServer GetAgentList() []*rpc.AgentInfo Capture(ctx context.Context, capture *CaptureArgs) (int, error) CaptureList(ctx context.Context) (map[int][]*CaptureTaskResult, error) QueryRangeEvent(ctx context.Context, start, end time.Time, filters map[string][]string, limit int) ([]Event, error) Diagnose(ctx context.Context, args *skoopContext.TaskConfig) (int64, error) DiagnoseList(ctx context.Context) ([]DiagnoseTaskResult, error) DownloadCaptureFile(ctx context.Context, id int) (string, int64, io.ReadCloser, error) PodList(ctx context.Context) ([]*Pod, error) NodeList(ctx context.Context) ([]*Node, error) NamespaceList(ctx context.Context) ([]string, error) QueryPrometheus(ctx context.Context, query string, ts time.Time) (model.Value, promv1.Warnings, error) GetPodNodeInfoFromMetrics(ctx context.Context, ts time.Time) (model.Vector, model.Vector, error) PingMesh(ctx context.Context, pingmesh *PingMeshArgs) (*PingMeshResult, error) GetExporterConfig(ctx context.Context) (*exporter.InspServerConfig, error) UpdateExporterConfig(ctx context.Context, cfg *exporter.InspServerConfig) error } type Config struct { Namespace string `yaml:"namespace"` KubeConfig string `yaml:"kubeConfig"` Prometheus string `yaml:"prometheus"` Loki string `yaml:"loki"` DB db.Config `yaml:"database"` Diagnose diagnose.Config `yaml:"diagnose"` } func NewControllerService(k8sClient *kubernetes.Clientset, config *Config) (ControllerService, error) { if config.Namespace == "" { config.Namespace = Namespace } ctrl := &controller{ taskWatcher: sync.Map{}, resultWatchers: sync.Map{}, Namespace: config.Namespace, ConfigMapName: ExporterConfigMap, } ctrl.k8sClient = k8sClient //init db if err := db.InitializeDB(&config.DB); err != nil { return nil, err } if config.Prometheus != "" { promClient, err := api.NewClient(api.Config{ Address: config.Prometheus, }) if err != nil { return nil, err } ctrl.promClient = promClient } if config.Loki != "" { lokiClient, err := lokiwrapper.NewClient(config.Loki) if err != nil { return nil, err } ctrl.lokiClient = lokiClient } //if diagnose kubeconfig is not set, use controller's kubeconfig as default if config.Diagnose.KubeConfig == "" { config.Diagnose.KubeConfig = config.KubeConfig } ctrl.diagnostor = diagnose.NewDiagnoseController(ctrl.Namespace, &config.Diagnose) return ctrl, nil } type controller struct { rpc.UnimplementedControllerRegisterServiceServer diagnostor diagnose.Controller k8sClient *kubernetes.Clientset taskWatcher sync.Map resultWatchers sync.Map promClient api.Client lokiClient *lokiwrapper.Client Namespace string ConfigMapName string }