func()

in pkg/apisix/cluster.go [295:402]


func (c *cluster) syncCacheOnce(ctx context.Context) (bool, error) {
	routes, err := c.route.List(ctx)
	if err != nil {
		log.Errorf("failed to list routes in APISIX: %s", err)
		return false, err
	}
	upstreams, err := c.upstream.List(ctx)
	if err != nil {
		log.Errorf("failed to list upstreams in APISIX: %s", err)
		return false, err
	}
	ssl, err := c.ssl.List(ctx)
	if err != nil {
		log.Errorf("failed to list ssl in APISIX: %s", err)
		return false, err
	}
	streamRoutes, err := c.streamRoute.List(ctx)
	if err != nil {
		log.Errorf("failed to list stream_routes in APISIX: %s", err)
		return false, err
	}
	globalRules, err := c.globalRules.List(ctx)
	if err != nil {
		log.Errorf("failed to list global_rules in APISIX: %s", err)
		return false, err
	}
	consumers, err := c.consumer.List(ctx)
	if err != nil {
		log.Errorf("failed to list consumers in APISIX: %s", err)
		return false, err
	}
	pluginConfigs, err := c.pluginConfig.List(ctx)
	if err != nil {
		log.Errorf("failed to list plugin_configs in APISIX: %s", err)
		return false, err
	}

	for _, r := range routes {
		if err := c.cache.InsertRoute(r); err != nil {
			log.Errorw("failed to insert route to cache",
				zap.String("route", r.ID),
				zap.String("cluster", c.name),
				zap.String("error", err.Error()),
			)
			return false, err
		}
	}
	for _, u := range upstreams {
		if err := c.cache.InsertUpstream(u); err != nil {
			log.Errorw("failed to insert upstream to cache",
				zap.String("upstream", u.ID),
				zap.String("cluster", c.name),
				zap.String("error", err.Error()),
			)
			return false, err
		}
	}
	for _, s := range ssl {
		if err := c.cache.InsertSSL(s); err != nil {
			log.Errorw("failed to insert ssl to cache",
				zap.String("ssl", s.ID),
				zap.String("cluster", c.name),
				zap.String("error", err.Error()),
			)
			return false, err
		}
	}
	for _, sr := range streamRoutes {
		if err := c.cache.InsertStreamRoute(sr); err != nil {
			log.Errorw("failed to insert stream_route to cache",
				zap.Any("stream_route", sr),
				zap.String("cluster", c.name),
				zap.String("error", err.Error()),
			)
			return false, err
		}
	}
	for _, gr := range globalRules {
		if err := c.cache.InsertGlobalRule(gr); err != nil {
			log.Errorw("failed to insert global_rule to cache",
				zap.Any("global_rule", gr),
				zap.String("cluster", c.name),
				zap.String("error", err.Error()),
			)
			return false, err
		}
	}
	for _, consumer := range consumers {
		if err := c.cache.InsertConsumer(consumer); err != nil {
			log.Errorw("failed to insert consumer to cache",
				zap.Any("consumer", consumer),
				zap.String("cluster", c.name),
				zap.String("error", err.Error()),
			)
		}
	}
	for _, u := range pluginConfigs {
		if err := c.cache.InsertPluginConfig(u); err != nil {
			log.Errorw("failed to insert pluginConfig to cache",
				zap.String("pluginConfig", u.ID),
				zap.String("cluster", c.name),
				zap.String("error", err.Error()),
			)
			return false, err
		}
	}
	return true, nil
}