pkg/exporter/cmd/server.go (411 lines of code) (raw):

/* Copyright © 2022 NAME HERE <EMAIL ADDRESS> */ package cmd import ( "context" "encoding/json" "errors" "fmt" "net" "net/http" "net/url" "os" "os/signal" "path" "reflect" "strings" "sync" "sync/atomic" "syscall" "time" task_agent "github.com/alibaba/kubeskoop/pkg/exporter/task-agent" "github.com/fsnotify/fsnotify" "github.com/alibaba/kubeskoop/pkg/exporter/sink" _ "net/http" //for golangci-lint _ "net/http/pprof" //for golangci-lint once more "github.com/alibaba/kubeskoop/pkg/exporter/nettop" "github.com/alibaba/kubeskoop/pkg/exporter/probe" gops "github.com/google/gops/agent" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/collectors" "github.com/prometheus/client_golang/prometheus/promhttp" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" ) // serverCmd represents the server command var ( serverCmd = &cobra.Command{ Use: "server", Short: "start inspector server", Run: func(_ *cobra.Command, _ []string) { insp := &inspServer{ configPath: configPath, ctx: context.Background(), } log.Infof("start with config file %s", configPath) cfg, err := loadConfig(insp.configPath) if err != nil { log.Errorf("merge config err: %v", err) return } if debug { cfg.DebugMode = true } if cfg.DebugMode { log.SetLevel(log.DebugLevel) } if err = nettop.StartCache(insp.ctx, sidecar); err != nil { log.Errorf("failed start cache: %v", err) return } defer nettop.StopCache() if cfg.EnableController { if cfg.ControllerAddr == "" { log.Infof("controller address is empty, use dns:controller:10263 as default") cfg.ControllerAddr = "dns:controller:10263" } if err := task_agent.NewTaskAgent(cfg.ControllerAddr).Run(); err != nil { log.Errorf("failed start agent: %v", err) return } } // block here err = insp.start(cfg) if err != nil { log.Infof("start server err: %v", err) return } }, } configPath = "/etc/config/config.yaml" ) type ProbeManager[T probe.Probe] interface { CreateProbe(config ProbeConfig) (T, error) StartProbe(ctx context.Context, probe T) error StopProbe(ctx context.Context, probe T) error } type DynamicProbeServer[T probe.Probe] struct { lock sync.Mutex probeManager ProbeManager[T] lastConfig []ProbeConfig probes map[string]T } func NewDynamicProbeServer[T probe.Probe](probeManager ProbeManager[T]) *DynamicProbeServer[T] { return &DynamicProbeServer[T]{ probeManager: probeManager, probes: make(map[string]T), } } func (s *DynamicProbeServer[T]) probeChanges(config []ProbeConfig) (toAdd []ProbeConfig, toClose []string) { toMap := func(configs []ProbeConfig) map[string]ProbeConfig { ret := make(map[string]ProbeConfig) for _, probeConfig := range configs { ret[probeConfig.Name] = probeConfig } return ret } lastConfigMap := toMap(s.lastConfig) configMap := toMap(config) for name := range lastConfigMap { if _, ok := configMap[name]; !ok { toClose = append(toClose, name) } } for name, probeConf := range configMap { lastConf, ok := lastConfigMap[name] if !ok { toAdd = append(toAdd, probeConf) } else { if !reflect.DeepEqual(lastConf, probeConf) { toAdd = append(toAdd, probeConf) toClose = append(toClose, name) } } } return toAdd, toClose } func (s *DynamicProbeServer[T]) Start(ctx context.Context, config []ProbeConfig) error { return s.Reload(ctx, config) } func (s *DynamicProbeServer[T]) Stop(ctx context.Context) error { s.lock.Lock() defer s.lock.Unlock() for _, probe := range s.probes { if err := s.probeManager.StopProbe(ctx, probe); err != nil { return err } } return nil } func marshalProbeConfig(config []ProbeConfig) string { s, _ := json.Marshal(config) return string(s) } func (s *DynamicProbeServer[T]) Reload(ctx context.Context, config []ProbeConfig) error { s.lock.Lock() defer s.lock.Unlock() log.Infof("reload config, old config: %s, new config: %s", marshalProbeConfig(s.lastConfig), marshalProbeConfig(config)) toAdd, toClose := s.probeChanges(config) var toAddProbes []T for _, probeConfig := range toAdd { probe, err := s.probeManager.CreateProbe(probeConfig) if err != nil { return fmt.Errorf("error create probe %s: %w", probeConfig.Name, err) } toAddProbes = append(toAddProbes, probe) } for _, name := range toClose { probe, ok := s.probes[name] if !ok { continue } if err := s.probeManager.StopProbe(ctx, probe); err != nil { return fmt.Errorf("failed stop probe %s, %w", name, err) } } s.lastConfig = config for _, probe := range toAddProbes { s.probes[probe.Name()] = probe if err := s.probeManager.StartProbe(ctx, probe); err != nil { log.Errorf("failed start probe %s, err: %v", probe.Name(), err) } } return nil } type probeState struct { Name string `json:"name"` State string `json:"state"` } func (s *DynamicProbeServer[T]) listProbes() []probeState { var ret []probeState for name, probe := range s.probes { ret = append(ret, probeState{Name: name, State: probe.State().String()}) } return ret } func init() { rootCmd.AddCommand(serverCmd) serverCmd.PersistentFlags().StringVarP(&configPath, "config", "c", "/etc/config/config.yaml", "config file path") } type inspServer struct { configPath string ctx context.Context metricsServer *MetricsServer eventServer *EventServer } func (i *inspServer) WatchConfig(done <-chan struct{}) error { watcher, err := fsnotify.NewWatcher() if err != nil { return err } if err = watcher.Add(i.configPath); err != nil { return err } var delaying atomic.Bool go func() { for { select { case <-watcher.Events: if delaying.CompareAndSwap(false, true) { time.AfterFunc(1*time.Second, func() { delaying.Store(false) if err = i.reload(); err != nil { log.Errorf("failed reload config %s: %v", i.configPath, err) } }) } case err = <-watcher.Errors: log.Errorf("error watch %s: %v", i.configPath, err) case <-done: _ = watcher.Close() return } } }() return nil } func (i *inspServer) reload() error { cfg, err := loadConfig(i.configPath) if err != nil { return err } ctx := context.TODO() err = i.metricsServer.Reload(ctx, cfg.MetricsConfig.Probes) if err != nil { return fmt.Errorf("reload metric server error: %s", err) } err = i.eventServer.Reload(ctx, cfg.EventConfig.Probes) if err != nil { return fmt.Errorf("reload event server error: %s", err) } return nil } func (i *inspServer) createListener(cfg *InspServerConfig) (net.Listener, error) { if cfg.Address == "" { if cfg.Port != 0 { cfg.Address = fmt.Sprintf(":%d", cfg.Port) log.Warningf("port is derepcated, use address instead") } else { return nil, fmt.Errorf("listen address is empty") } } rawAddr := cfg.Address if !strings.Contains(rawAddr, "://") { log.Infof("address contains no protocol part, use tcp:// by default") rawAddr = "tcp://" + rawAddr } u, err := url.Parse(rawAddr) if err != nil { return nil, fmt.Errorf("invalid address %s, valid format is [protocol://]addr[:port]", rawAddr) } protocol := u.Scheme addr := "" switch protocol { case "unix": addr = u.Path if _, err = os.Stat(addr); err != nil { if !errors.Is(err, os.ErrNotExist) { return nil, fmt.Errorf("failed stat sock file %s: %w", addr, err) } parent := path.Dir(addr) if err = os.MkdirAll(parent, 0o755); err != nil { return nil, fmt.Errorf("failed create director %s: %w", parent, err) } } else { //socket file exists, just remove it _ = os.Remove(addr) } case "tcp": addr = u.Host default: return nil, fmt.Errorf("unsupported protocol %s, only `tcp` and 'unix' are supported", protocol) } return net.Listen(protocol, addr) } func (i *inspServer) newHTTPServer(cfg *InspServerConfig) (*http.Server, net.Listener, error) { http.Handle("/metrics", i.metricsServer) http.Handle("/", http.HandlerFunc(defaultPage)) http.Handle("/status", http.HandlerFunc(i.statusPage)) if cfg.DebugMode { reg := prometheus.NewRegistry() reg.MustRegister( collectors.NewGoCollector(), collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}), ) http.Handle("/internal", promhttp.HandlerFor(reg, promhttp.HandlerOpts{Registry: reg})) } listener, err := i.createListener(cfg) if err != nil { return nil, nil, fmt.Errorf("failed create listener: %w", err) } log.Infof("inspector start metric server, listenAddr: %s", listener.Addr()) return &http.Server{}, listener, nil } func (i *inspServer) start(cfg *InspServerConfig) error { if err := gops.Listen(gops.Options{}); err != nil { log.Infof("start gops err: %v", err) } var err error ctx := context.TODO() err = probe.InitAdditionalLabels(cfg.MetricsConfig.AdditionalLabels) if err != nil { return fmt.Errorf("failed init additional labels: %w", err) } i.metricsServer, err = newMetricsServer() if err != nil { return fmt.Errorf("failed create metrics server: %w", err) } if err := i.metricsServer.Start(ctx, cfg.MetricsConfig.Probes); err != nil { return fmt.Errorf("failed start metrics server: %w", err) } defer func() { _ = i.metricsServer.Stop(ctx) }() sinks, err := createSink(cfg.EventConfig.EventSinks) if err != nil { return fmt.Errorf("failed create sinks, err: %w", err) } if len(sinks) != len(cfg.EventConfig.EventSinks) { log.Warnf("expected to create %d sinks , but %d were created", len(cfg.EventConfig.EventSinks), len(sinks)) } i.eventServer, err = newEventServer(sinks) if err != nil { return fmt.Errorf("failed create event server: %w", err) } if err = i.eventServer.Start(ctx, cfg.EventConfig.Probes); err != nil { return fmt.Errorf("failed start event server: %w", err) } defer func() { _ = i.eventServer.Stop(ctx) }() done := make(chan struct{}) if err = i.WatchConfig(done); err != nil { log.Errorf("failed watch config, dynamic load would not work: %v", err) } srv, listener, err := i.newHTTPServer(cfg) if err != nil { return fmt.Errorf("failed start http server: %w", err) } serverClosedChan := make(chan struct{}) serverClosed := false go func() { if err := srv.Serve(listener); err != nil && err != http.ErrServerClosed { log.Errorf("server error: %v", err) } close(serverClosedChan) serverClosed = true }() WaitSignals(serverClosedChan, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM) close(done) if !serverClosed { _ = srv.Shutdown(ctx) } _ = listener.Close() return nil } func createSink(sinkConfigs []EventSinkConfig) ([]sink.Sink, error) { var ret []sink.Sink for _, config := range sinkConfigs { s, err := sink.CreateSink(config.Name, config.Args) if err != nil { log.Errorf("failed create sink %s, err: %v", config.Name, err) continue } ret = append(ret, s) } return ret, nil } func WaitSignals(done <-chan struct{}, sgs ...os.Signal) { s := make(chan os.Signal, 1) signal.Notify(s, sgs...) select { case sig := <-s: log.Warnf("recive signal %s, stopping", sig.String()) return case <-done: log.Warnf("recive server close signal") } } func defaultPage(w http.ResponseWriter, _ *http.Request) { // nolint w.Write([]byte(`<html> <head><title>Net Exporter</title></head> <body> <h1>Net Exporter</h1> <p><a href="/metrics">Metrics</a></p> </body> </html>`)) } func (i *inspServer) statusPage(w http.ResponseWriter, _ *http.Request) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) res := map[string]interface{}{ "inuse_probes": map[string][]probeState{ "metrics": i.metricsServer.listProbes(), "event": i.eventServer.listProbes(), }, "available_probes": map[string][]string{ "event": probe.ListEventProbes(), "metrics": probe.ListMetricsProbes(), }, } rawText, err := json.Marshal(res) if err != nil { log.Errorf("failed marshal probe status: %v", err) } w.Write(rawText) // nolint }