in pkg/pipe/pipe.go [46:89]
func (p *Pipe) Init(ctx context.Context) error {
logger.Log.Debugf("initializing pipe")
p.workflows = []workflow{}
initialized := map[string]bool{}
for _, filter := range configs.GlobalConfig.Filters {
filter.Init()
for _, name := range filter.Exporters {
if _, ok := configs.GlobalConfig.Exporters[name]; !ok {
return fmt.Errorf("exporter %v is not defined", filter.Exporters)
}
exporter := exp.GetExporter(name)
if exporter == nil {
return fmt.Errorf("exporter %v is not defined", filter.Exporters)
}
if initialized[name] {
logger.Log.Debugf("exporter %+v has been initialized, skip", name)
continue
}
if err := exporter.Init(ctx); err != nil {
return err
}
initialized[name] = true
events := make(chan *v1.Event)
p.workflows = append(p.workflows, workflow{
filter: filter,
exporter: exporter,
events: events,
})
}
}
if err := k8s.Registry.Init(); err != nil {
return err
}
logger.Log.Debugf("pipe has been initialized")
return nil
}