in pkg/providers/apisix/provider_init.go [33:181]
func (p *apisixProvider) Init(ctx context.Context) error {
var (
wg sync.WaitGroup
routeMapK8S = new(sync.Map)
streamRouteMapK8S = new(sync.Map)
upstreamMapK8S = new(sync.Map)
sslMapK8S = new(sync.Map)
consumerMapK8S = new(sync.Map)
pluginConfigMapK8S = new(sync.Map)
routeMapA6 = make(map[string]string)
streamRouteMapA6 = make(map[string]string)
upstreamMapA6 = make(map[string]string)
sslMapA6 = make(map[string]string)
consumerMapA6 = make(map[string]string)
pluginConfigMapA6 = make(map[string]string)
)
namespaces := p.namespaceProvider.WatchingNamespaces()
for _, key := range namespaces {
log.Debugf("start to watch namespace: %s", key)
wg.Add(1)
go func(ns string) {
defer wg.Done()
// ApisixRoute
switch p.common.Config.Kubernetes.APIVersion {
case config.ApisixV2:
retRoutes, err := p.common.ApisixRouteLister.V2Lister().List(labels.Everything())
if err != nil {
log.Error(err.Error())
ctx.Done()
} else {
for _, r := range retRoutes {
tc, err := p.apisixTranslator.GenerateRouteV2DeleteMark(r)
if err != nil {
log.Error(err.Error())
ctx.Done()
} else {
// routes
for _, route := range tc.Routes {
routeMapK8S.Store(route.ID, route.ID)
}
// streamRoutes
for _, stRoute := range tc.StreamRoutes {
streamRouteMapK8S.Store(stRoute.ID, stRoute.ID)
}
// upstreams
for _, upstream := range tc.Upstreams {
upstreamMapK8S.Store(upstream.ID, upstream.ID)
}
// ssl
for _, ssl := range tc.SSL {
sslMapK8S.Store(ssl.ID, ssl.ID)
}
// pluginConfigs
for _, pluginConfig := range tc.PluginConfigs {
pluginConfigMapK8S.Store(pluginConfig.ID, pluginConfig.ID)
}
}
}
}
default:
log.Errorw("failed to sync ApisixRoute, unexpected version",
zap.String("version", p.common.Config.Kubernetes.APIVersion),
)
}
// todo ApisixUpstream and ApisixPluginConfig
// ApisixUpstream and ApisixPluginConfig should be synced with ApisixRoute resource
switch p.common.Config.Kubernetes.APIVersion {
case config.ApisixV2:
// ApisixConsumer
retConsumer, err := p.common.ApisixFactory.Apisix().V2().ApisixConsumers().Lister().ApisixConsumers(ns).List(labels.Everything())
if err != nil {
log.Error(err.Error())
ctx.Done()
} else {
for _, con := range retConsumer {
consumer, err := p.apisixTranslator.TranslateApisixConsumerV2(con)
if err != nil {
log.Error(err.Error())
ctx.Done()
} else {
consumerMapK8S.Store(consumer.Username, consumer.Username)
}
}
}
// ApisixTls
retSSL, err := p.common.ApisixFactory.Apisix().V2().ApisixTlses().Lister().ApisixTlses(ns).List(labels.Everything())
if err != nil {
log.Error(err.Error())
ctx.Done()
} else {
for _, s := range retSSL {
ssl, err := p.apisixTranslator.TranslateSSLV2(s)
if err != nil {
log.Error(err.Error())
ctx.Done()
} else {
sslMapK8S.Store(ssl.ID, ssl.ID)
}
}
}
default:
log.Errorw("failed to sync ApisixConsumer, unexpected version",
zap.String("version", p.common.Config.Kubernetes.APIVersion),
)
}
}(key)
}
wg.Wait()
// 2.get all cache routes
if err := p.listRouteCache(ctx, routeMapA6); err != nil {
return err
}
if err := p.listStreamRouteCache(ctx, streamRouteMapA6); err != nil {
return err
}
if err := p.listUpstreamCache(ctx, upstreamMapA6); err != nil {
return err
}
if err := p.listSSLCache(ctx, sslMapA6); err != nil {
return err
}
if err := p.listConsumerCache(ctx, consumerMapA6); err != nil {
return err
}
if err := p.listPluginConfigCache(ctx, pluginConfigMapA6); err != nil {
return err
}
// 3.compare
routeResult := findRedundant(routeMapA6, routeMapK8S)
streamRouteResult := findRedundant(streamRouteMapA6, streamRouteMapK8S)
upstreamResult := findRedundant(upstreamMapA6, upstreamMapK8S)
sslResult := findRedundant(sslMapA6, sslMapK8S)
consumerResult := findRedundant(consumerMapA6, consumerMapK8S)
pluginConfigResult := findRedundant(pluginConfigMapA6, pluginConfigMapK8S)
// 4.warn
warnRedundantResources(routeResult, "route")
warnRedundantResources(streamRouteResult, "streamRoute")
warnRedundantResources(upstreamResult, "upstream")
warnRedundantResources(sslResult, "ssl")
warnRedundantResources(consumerResult, "consumer")
warnRedundantResources(pluginConfigResult, "pluginConfig")
return nil
}