in pkg/providers/utils/manifest.go [270:458]
func SyncManifests(ctx context.Context, apisix apisix.APISIX, clusterName string, added, updated, deleted *Manifest, shouldCompare bool) error {
var merr *multierror.Error
if added != nil {
// Should create upstreams firstly due to the dependencies.
for _, ssl := range added.SSLs {
if _, err := apisix.Cluster(clusterName).SSL().Create(ctx, ssl, shouldCompare); err != nil {
merr = multierror.Append(merr, err)
}
}
for _, u := range added.Upstreams {
if _, err := apisix.Cluster(clusterName).Upstream().Create(ctx, u, shouldCompare); err != nil {
merr = multierror.Append(merr, err)
}
}
for _, pc := range added.PluginConfigs {
if _, err := apisix.Cluster(clusterName).PluginConfig().Create(ctx, pc, shouldCompare); err != nil {
merr = multierror.Append(merr, err)
}
}
for _, r := range added.Routes {
if _, err := apisix.Cluster(clusterName).Route().Create(ctx, r, shouldCompare); err != nil {
merr = multierror.Append(merr, err)
}
}
for _, sr := range added.StreamRoutes {
if _, err := apisix.Cluster(clusterName).StreamRoute().Create(ctx, sr, shouldCompare); err != nil {
merr = multierror.Append(merr, err)
}
}
for _, pm := range added.PluginMetadatas {
if _, err := apisix.Cluster(clusterName).PluginMetadata().Create(ctx, pm, shouldCompare); err != nil {
merr = multierror.Append(merr, err)
}
}
for _, gr := range added.GlobalRules {
if _, err := apisix.Cluster(clusterName).GlobalRule().Create(ctx, gr, shouldCompare); err != nil {
merr = multierror.Append(merr, err)
}
}
}
if updated != nil {
for _, ssl := range updated.SSLs {
if _, err := apisix.Cluster(clusterName).SSL().Update(ctx, ssl, false); err != nil {
merr = multierror.Append(merr, err)
}
}
for _, r := range updated.Upstreams {
if _, err := apisix.Cluster(clusterName).Upstream().Update(ctx, r, false); err != nil {
merr = multierror.Append(merr, err)
}
}
for _, pc := range updated.PluginConfigs {
if _, err := apisix.Cluster(clusterName).PluginConfig().Update(ctx, pc, false); err != nil {
merr = multierror.Append(merr, err)
}
}
for _, r := range updated.Routes {
if _, err := apisix.Cluster(clusterName).Route().Update(ctx, r, false); err != nil {
merr = multierror.Append(merr, err)
}
}
for _, sr := range updated.StreamRoutes {
if _, err := apisix.Cluster(clusterName).StreamRoute().Update(ctx, sr, false); err != nil {
merr = multierror.Append(merr, err)
}
}
for _, pm := range updated.PluginMetadatas {
if _, err := apisix.Cluster(clusterName).PluginMetadata().Update(ctx, pm, false); err != nil {
merr = multierror.Append(merr, err)
}
}
for _, gr := range updated.GlobalRules {
if _, err := apisix.Cluster(clusterName).GlobalRule().Update(ctx, gr, false); err != nil {
merr = multierror.Append(merr, err)
}
}
}
if deleted != nil {
for _, ssl := range deleted.SSLs {
if err := apisix.Cluster(clusterName).SSL().Delete(ctx, ssl); err != nil {
merr = multierror.Append(merr, err)
}
}
for _, r := range deleted.Routes {
if err := apisix.Cluster(clusterName).Route().Delete(ctx, r); err != nil {
log.Warnw("failed to delete route, this may affect upstream deletions",
zap.Error(err),
zap.Any("route", r),
)
merr = multierror.Append(merr, err)
}
}
for _, sr := range deleted.StreamRoutes {
if err := apisix.Cluster(clusterName).StreamRoute().Delete(ctx, sr); err != nil {
merr = multierror.Append(merr, err)
}
}
for _, u := range deleted.Upstreams {
if err := apisix.Cluster(clusterName).Upstream().Delete(ctx, u); err != nil {
// Upstream might be referenced by other routes.
if err != cache.ErrStillInUse {
merr = multierror.Append(merr, err)
} else {
log.Infow("upstream was referenced by other routes",
zap.String("upstream_id", u.ID),
zap.String("upstream_name", u.Name),
)
if log.Level() <= zap.DebugLevel {
// this could also happen when the route is synced(deleted) in another syncManifest call,
// but arrives later than this
// So log the deleted routes in this call to see if it's true
if len(deleted.Routes) == 0 {
log.Debugw("syncManifest deletes upstream but doesn't delete any routes")
} else {
found := false
for _, r := range deleted.Routes {
if r.UpstreamId == u.ID {
found = true
log.Debugw("a deleted route is referencing upstream",
zap.Any("route", r),
)
}
}
if !found {
log.Debugw("no any deleted route is referencing this upstream",
zap.String("upstream_id", u.ID),
)
}
}
// try to find which route is referencing the upstream
routes, err := apisix.Cluster(clusterName).Route().List(ctx)
if err != nil {
log.Debugw("try to find referencing routes, but failed to list",
zap.Error(err),
)
}
found := false
for _, r := range routes {
if r.UpstreamId == u.ID {
found = true
log.Debugw("route is referencing upstream",
zap.Any("route", r),
)
}
}
if !found {
log.Debugw("failed to find a route that references the upstream",
zap.String("upstream_id", u.ID),
zap.Any("routes", routes),
)
}
}
}
}
}
for _, pc := range deleted.PluginConfigs {
if err := apisix.Cluster(clusterName).PluginConfig().Delete(ctx, pc); err != nil {
// pluginConfig might be referenced by other routes.
if err != cache.ErrStillInUse {
merr = multierror.Append(merr, err)
} else {
log.Infow("plugin_config was referenced by other routes",
zap.String("plugin_config_id", pc.ID),
zap.String("plugin_config_name", pc.Name),
)
}
}
}
for _, pm := range deleted.PluginMetadatas {
if err := apisix.Cluster(clusterName).PluginMetadata().Delete(ctx, pm); err != nil {
merr = multierror.Append(merr, err)
}
}
for _, gr := range deleted.GlobalRules {
if err := apisix.Cluster(clusterName).GlobalRule().Delete(ctx, gr); err != nil {
merr = multierror.Append(merr, err)
}
}
}
if merr != nil {
return merr
}
return nil
}