func()

in pkg/pipe/pipe.go [91:118]


func (p *Pipe) Start(ctx context.Context) error {
	p.Watcher.Start(ctx)

	k8s.Registry.Start(ctx)

	for _, wkfl := range p.workflows {
		go wkfl.exporter.Export(ctx, wkfl.events)
	}

	for {
		select {
		case <-ctx.Done():
			logger.Log.Debugf("stopping pipe")
			return nil
		case e := <-p.Watcher.Events:
			for _, wkfl := range p.workflows {
				go func(w workflow) {
					fCtx, cancel := context.WithTimeout(ctx, time.Minute)
					defer cancel()

					if !w.filter.Filter(fCtx, e) {
						w.events <- e
					}
				}(wkfl)
			}
		}
	}
}