func initModules()

in internal/satellite/boot/boot.go [99:130]


func initModules(cfg *config.SatelliteConfig) (ModuleContainer, error) {
	log.Logger.Infof("satellite is initializing...")
	for _, aCfg := range cfg.Pipes {
		if aCfg.Gatherer == nil || aCfg.Sender == nil || aCfg.Processor == nil {
			return nil, errors.New("gatherer, sender, and processor is required in the namespace config")
		}
	}
	// container contains the modules in each namespace.
	container := make(ModuleContainer)
	for _, aCfg := range cfg.Pipes {
		// the added sequence should follow gather, sender and processor to purpose the booting sequence.
		var modules []api.Module
		g := gatherer.NewGatherer(aCfg.Gatherer)
		s := sender.NewSender(aCfg.Sender)
		p := processor.NewProcessor(aCfg.Processor)
		if err := g.SetProcessor(p); err != nil {
			return nil, err
		}
		if err := p.SetGatherer(g); err != nil {
			return nil, err
		}
		if err := p.SetSender(s); err != nil {
			return nil, err
		}
		if err := s.SetGatherer(g); err != nil {
			return nil, err
		}
		modules = append(modules, g, s, p)
		container[aCfg.PipeCommonConfig.PipeName] = modules
	}
	return container, nil
}