func()

in pkg/apisix/cluster.go [462:517]


func (c *cluster) syncSchemaOnce(ctx context.Context) error {
	log.Infow("syncing schema", zap.String("cluster", c.name))

	schemaList, err := c.cache.ListSchema()
	if err != nil {
		log.Errorf("failed to list schema in the cache: %s", err)
		return err
	}
	for _, s := range schemaList {
		if err := c.cache.DeleteSchema(s); err != nil {
			log.Warnw("failed to delete schema in cache",
				zap.String("schemaName", s.Name),
				zap.String("schemaContent", s.Content),
				zap.String("error", err.Error()),
			)
		}
	}

	// update plugins' schema.
	pluginList, err := c.plugin.List(ctx)
	if err != nil {
		log.Errorf("failed to list plugin names in APISIX: %s", err)
		return err
	}

	var failedPlugins []string
	for _, p := range pluginList {
		ps, err := c.schema.GetPluginSchema(ctx, p)
		if err != nil {
			if strings.Contains(err.Error(), "connection refused") {
				log.Warnw("failed to get plugin schema, target connection refused",
					zap.Error(err),
				)
				break
			}
			failedPlugins = append(failedPlugins, p)
			continue
		}

		if err := c.cache.InsertSchema(ps); err != nil {
			log.Warnw("failed to insert schema to cache",
				zap.String("plugin", p),
				zap.String("cluster", c.name),
				zap.String("error", err.Error()),
			)
			continue
		}
	}
	if len(failedPlugins) > 0 {
		log.Warnw("failed to get plugin schema",
			zap.Strings("plugins", failedPlugins),
		)
	}
	c.metricsCollector.IncrSyncOperation("schema", "success")
	return nil
}