func()

in pkg/providers/apisix/provider_init.go [33:181]


func (p *apisixProvider) Init(ctx context.Context) error {
	var (
		wg                 sync.WaitGroup
		routeMapK8S        = new(sync.Map)
		streamRouteMapK8S  = new(sync.Map)
		upstreamMapK8S     = new(sync.Map)
		sslMapK8S          = new(sync.Map)
		consumerMapK8S     = new(sync.Map)
		pluginConfigMapK8S = new(sync.Map)

		routeMapA6        = make(map[string]string)
		streamRouteMapA6  = make(map[string]string)
		upstreamMapA6     = make(map[string]string)
		sslMapA6          = make(map[string]string)
		consumerMapA6     = make(map[string]string)
		pluginConfigMapA6 = make(map[string]string)
	)

	namespaces := p.namespaceProvider.WatchingNamespaces()

	for _, key := range namespaces {
		log.Debugf("start to watch namespace: %s", key)
		wg.Add(1)
		go func(ns string) {
			defer wg.Done()
			// ApisixRoute
			switch p.common.Config.Kubernetes.APIVersion {
			case config.ApisixV2:
				retRoutes, err := p.common.ApisixRouteLister.V2Lister().List(labels.Everything())
				if err != nil {
					log.Error(err.Error())
					ctx.Done()
				} else {
					for _, r := range retRoutes {
						tc, err := p.apisixTranslator.GenerateRouteV2DeleteMark(r)
						if err != nil {
							log.Error(err.Error())
							ctx.Done()
						} else {
							// routes
							for _, route := range tc.Routes {
								routeMapK8S.Store(route.ID, route.ID)
							}
							// streamRoutes
							for _, stRoute := range tc.StreamRoutes {
								streamRouteMapK8S.Store(stRoute.ID, stRoute.ID)
							}
							// upstreams
							for _, upstream := range tc.Upstreams {
								upstreamMapK8S.Store(upstream.ID, upstream.ID)
							}
							// ssl
							for _, ssl := range tc.SSL {
								sslMapK8S.Store(ssl.ID, ssl.ID)
							}
							// pluginConfigs
							for _, pluginConfig := range tc.PluginConfigs {
								pluginConfigMapK8S.Store(pluginConfig.ID, pluginConfig.ID)
							}
						}
					}
				}
			default:
				log.Errorw("failed to sync ApisixRoute, unexpected version",
					zap.String("version", p.common.Config.Kubernetes.APIVersion),
				)
			}
			// todo ApisixUpstream and ApisixPluginConfig
			// ApisixUpstream and ApisixPluginConfig should be synced with ApisixRoute resource

			switch p.common.Config.Kubernetes.APIVersion {
			case config.ApisixV2:
				// ApisixConsumer
				retConsumer, err := p.common.ApisixFactory.Apisix().V2().ApisixConsumers().Lister().ApisixConsumers(ns).List(labels.Everything())
				if err != nil {
					log.Error(err.Error())
					ctx.Done()
				} else {
					for _, con := range retConsumer {
						consumer, err := p.apisixTranslator.TranslateApisixConsumerV2(con)
						if err != nil {
							log.Error(err.Error())
							ctx.Done()
						} else {
							consumerMapK8S.Store(consumer.Username, consumer.Username)
						}
					}
				}
				// ApisixTls
				retSSL, err := p.common.ApisixFactory.Apisix().V2().ApisixTlses().Lister().ApisixTlses(ns).List(labels.Everything())
				if err != nil {
					log.Error(err.Error())
					ctx.Done()
				} else {
					for _, s := range retSSL {
						ssl, err := p.apisixTranslator.TranslateSSLV2(s)
						if err != nil {
							log.Error(err.Error())
							ctx.Done()
						} else {
							sslMapK8S.Store(ssl.ID, ssl.ID)
						}
					}
				}
			default:
				log.Errorw("failed to sync ApisixConsumer, unexpected version",
					zap.String("version", p.common.Config.Kubernetes.APIVersion),
				)
			}
		}(key)
	}
	wg.Wait()

	// 2.get all cache routes
	if err := p.listRouteCache(ctx, routeMapA6); err != nil {
		return err
	}
	if err := p.listStreamRouteCache(ctx, streamRouteMapA6); err != nil {
		return err
	}
	if err := p.listUpstreamCache(ctx, upstreamMapA6); err != nil {
		return err
	}
	if err := p.listSSLCache(ctx, sslMapA6); err != nil {
		return err
	}
	if err := p.listConsumerCache(ctx, consumerMapA6); err != nil {
		return err
	}
	if err := p.listPluginConfigCache(ctx, pluginConfigMapA6); err != nil {
		return err
	}
	// 3.compare
	routeResult := findRedundant(routeMapA6, routeMapK8S)
	streamRouteResult := findRedundant(streamRouteMapA6, streamRouteMapK8S)
	upstreamResult := findRedundant(upstreamMapA6, upstreamMapK8S)
	sslResult := findRedundant(sslMapA6, sslMapK8S)
	consumerResult := findRedundant(consumerMapA6, consumerMapK8S)
	pluginConfigResult := findRedundant(pluginConfigMapA6, pluginConfigMapK8S)
	// 4.warn
	warnRedundantResources(routeResult, "route")
	warnRedundantResources(streamRouteResult, "streamRoute")
	warnRedundantResources(upstreamResult, "upstream")
	warnRedundantResources(sslResult, "ssl")
	warnRedundantResources(consumerResult, "consumer")
	warnRedundantResources(pluginConfigResult, "pluginConfig")

	return nil
}