in pkg/pipe/pipe.go [91:118]
func (p *Pipe) Start(ctx context.Context) error {
	p.Watcher.Start(ctx)
	k8s.Registry.Start(ctx)
	for _, wkfl := range p.workflows {
		go wkfl.exporter.Export(ctx, wkfl.events)
	}
	for {
		select {
		case <-ctx.Done():
			logger.Log.Debugf("stopping pipe")
			return nil
		case e := <-p.Watcher.Events:
			for _, wkfl := range p.workflows {
				go func(w workflow) {
					fCtx, cancel := context.WithTimeout(ctx, time.Minute)
					defer cancel()
					if !w.filter.Filter(fCtx, e) {
						w.events <- e
					}
				}(wkfl)
			}
		}
	}
}