pkg/exporter/cmd/eventserver.go (106 lines of code) (raw):

package cmd import ( "context" "github.com/alibaba/kubeskoop/pkg/exporter/probe" "github.com/alibaba/kubeskoop/pkg/exporter/sink" log "github.com/sirupsen/logrus" ) type EventServer struct { *DynamicProbeServer[probe.EventProbe] } func newEventServer(sinks []sink.Sink) (*EventServer, error) { var sinkWrappers []*sinkWrapper done := make(chan struct{}) for _, s := range sinks { sinkWrappers = append(sinkWrappers, &sinkWrapper{ ch: make(chan *probe.Event, 1024), s: s, done: done, }) } probeManager := &EventProbeManager{ sinks: sinkWrappers, sinkChan: make(chan *probe.Event), done: done, } return &EventServer{ DynamicProbeServer: NewDynamicProbeServer[probe.EventProbe](probeManager), }, nil } func (s *EventServer) Start(ctx context.Context, probeConfig []ProbeConfig) error { s.probeManager.(*EventProbeManager).start() return s.DynamicProbeServer.Start(ctx, probeConfig) } func (s *EventServer) Stop(ctx context.Context) error { if err := s.DynamicProbeServer.Stop(ctx); err != nil { return err } s.probeManager.(*EventProbeManager).stop() return nil } type EventProbeManager struct { sinkChan chan *probe.Event sinks []*sinkWrapper done chan struct{} } type sinkWrapper struct { ch chan *probe.Event s sink.Sink done chan struct{} } func (m *EventProbeManager) stop() { log.Infof("probe manager stopped") close(m.done) } func consume(sw *sinkWrapper) { loop: for { select { case evt := <-sw.ch: if err := sw.s.Write(evt); err != nil { log.Errorf("error sink evt %s", err) } case <-sw.done: break loop } } } func (m *EventProbeManager) start() { for _, s := range m.sinks { go consume(s) } go func() { loop: for { select { case evt := <-m.sinkChan: for _, sw := range m.sinks { select { case sw.ch <- evt: break default: log.Errorf("%s is blocked, discard event.", sw.s) } } case <-m.done: break loop } } }() } func (m *EventProbeManager) CreateProbe(config ProbeConfig) (probe.EventProbe, error) { return probe.CreateEventProbe(config.Name, m.sinkChan, config.Args) } func (m *EventProbeManager) StartProbe(ctx context.Context, p probe.EventProbe) error { log.Infof("start event probe %s", p.Name()) return p.Start(ctx) } func (m *EventProbeManager) StopProbe(ctx context.Context, p probe.EventProbe) error { log.Infof("stop event probe %s", p.Name()) state := p.State() if state == probe.ProbeStateStopped || state == probe.ProbeStateStopping || state == probe.ProbeStateFailed { return nil } return p.Stop(ctx) } var _ ProbeManager[probe.MetricsProbe] = &MetricsProbeManager{}