func()

in pkg/pipe/pipe.go [46:89]


func (p *Pipe) Init(ctx context.Context) error {
	logger.Log.Debugf("initializing pipe")

	p.workflows = []workflow{}

	initialized := map[string]bool{}
	for _, filter := range configs.GlobalConfig.Filters {
		filter.Init()

		for _, name := range filter.Exporters {
			if _, ok := configs.GlobalConfig.Exporters[name]; !ok {
				return fmt.Errorf("exporter %v is not defined", filter.Exporters)
			}
			exporter := exp.GetExporter(name)
			if exporter == nil {
				return fmt.Errorf("exporter %v is not defined", filter.Exporters)
			}
			if initialized[name] {
				logger.Log.Debugf("exporter %+v has been initialized, skip", name)
				continue
			}
			if err := exporter.Init(ctx); err != nil {
				return err
			}
			initialized[name] = true

			events := make(chan *v1.Event)

			p.workflows = append(p.workflows, workflow{
				filter:   filter,
				exporter: exporter,
				events:   events,
			})
		}
	}

	if err := k8s.Registry.Init(); err != nil {
		return err
	}

	logger.Log.Debugf("pipe has been initialized")

	return nil
}