in pkg/pipe/pipe.go [91:118]
func (p *Pipe) Start(ctx context.Context) error {
p.Watcher.Start(ctx)
k8s.Registry.Start(ctx)
for _, wkfl := range p.workflows {
go wkfl.exporter.Export(ctx, wkfl.events)
}
for {
select {
case <-ctx.Done():
logger.Log.Debugf("stopping pipe")
return nil
case e := <-p.Watcher.Events:
for _, wkfl := range p.workflows {
go func(w workflow) {
fCtx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()
if !w.filter.Filter(fCtx, e) {
w.events <- e
}
}(wkfl)
}
}
}
}