in pkg/apisix/cluster.go [397:452]
func (c *cluster) syncSchemaOnce(ctx context.Context) error {
log.Infow("syncing schema", zap.String("cluster", c.name))
schemaList, err := c.cache.ListSchema()
if err != nil {
log.Errorf("failed to list schema in the cache: %s", err)
return err
}
for _, s := range schemaList {
if err := c.cache.DeleteSchema(s); err != nil {
log.Warnw("failed to delete schema in cache",
zap.String("schemaName", s.Name),
zap.String("schemaContent", s.Content),
zap.String("error", err.Error()),
)
}
}
// update plugins' schema.
pluginList, err := c.plugin.List(ctx)
if err != nil {
log.Errorf("failed to list plugin names in APISIX: %s", err)
return err
}
var failedPlugins []string
for _, p := range pluginList {
ps, err := c.schema.GetPluginSchema(ctx, p)
if err != nil {
if strings.Contains(err.Error(), "connection refused") {
log.Warnw("failed to get plugin schema, target connection refused",
zap.Error(err),
)
break
}
failedPlugins = append(failedPlugins, p)
continue
}
if err := c.cache.InsertSchema(ps); err != nil {
log.Warnw("failed to insert schema to cache",
zap.String("plugin", p),
zap.String("cluster", c.name),
zap.String("error", err.Error()),
)
continue
}
}
if len(failedPlugins) > 0 {
log.Warnw("failed to get plugin schema",
zap.Strings("plugins", failedPlugins),
)
}
c.metricsCollector.IncrSyncOperation("schema", "success")
return nil
}