func SyncManifests()

in pkg/providers/utils/manifest.go [270:458]


func SyncManifests(ctx context.Context, apisix apisix.APISIX, clusterName string, added, updated, deleted *Manifest, shouldCompare bool) error {
	var merr *multierror.Error

	if added != nil {
		// Should create upstreams firstly due to the dependencies.
		for _, ssl := range added.SSLs {
			if _, err := apisix.Cluster(clusterName).SSL().Create(ctx, ssl, shouldCompare); err != nil {
				merr = multierror.Append(merr, err)
			}
		}
		for _, u := range added.Upstreams {
			if _, err := apisix.Cluster(clusterName).Upstream().Create(ctx, u, shouldCompare); err != nil {
				merr = multierror.Append(merr, err)
			}
		}
		for _, pc := range added.PluginConfigs {
			if _, err := apisix.Cluster(clusterName).PluginConfig().Create(ctx, pc, shouldCompare); err != nil {
				merr = multierror.Append(merr, err)
			}
		}
		for _, r := range added.Routes {
			if _, err := apisix.Cluster(clusterName).Route().Create(ctx, r, shouldCompare); err != nil {
				merr = multierror.Append(merr, err)
			}
		}
		for _, sr := range added.StreamRoutes {
			if _, err := apisix.Cluster(clusterName).StreamRoute().Create(ctx, sr, shouldCompare); err != nil {
				merr = multierror.Append(merr, err)
			}
		}
		for _, pm := range added.PluginMetadatas {
			if _, err := apisix.Cluster(clusterName).PluginMetadata().Create(ctx, pm, shouldCompare); err != nil {
				merr = multierror.Append(merr, err)
			}
		}
		for _, gr := range added.GlobalRules {
			if _, err := apisix.Cluster(clusterName).GlobalRule().Create(ctx, gr, shouldCompare); err != nil {
				merr = multierror.Append(merr, err)
			}
		}
	}
	if updated != nil {
		for _, ssl := range updated.SSLs {
			if _, err := apisix.Cluster(clusterName).SSL().Update(ctx, ssl, false); err != nil {
				merr = multierror.Append(merr, err)
			}
		}
		for _, r := range updated.Upstreams {
			if _, err := apisix.Cluster(clusterName).Upstream().Update(ctx, r, false); err != nil {
				merr = multierror.Append(merr, err)
			}
		}
		for _, pc := range updated.PluginConfigs {
			if _, err := apisix.Cluster(clusterName).PluginConfig().Update(ctx, pc, false); err != nil {
				merr = multierror.Append(merr, err)
			}
		}
		for _, r := range updated.Routes {
			if _, err := apisix.Cluster(clusterName).Route().Update(ctx, r, false); err != nil {
				merr = multierror.Append(merr, err)
			}
		}
		for _, sr := range updated.StreamRoutes {
			if _, err := apisix.Cluster(clusterName).StreamRoute().Update(ctx, sr, false); err != nil {
				merr = multierror.Append(merr, err)
			}
		}
		for _, pm := range updated.PluginMetadatas {
			if _, err := apisix.Cluster(clusterName).PluginMetadata().Update(ctx, pm, false); err != nil {
				merr = multierror.Append(merr, err)
			}
		}
		for _, gr := range updated.GlobalRules {
			if _, err := apisix.Cluster(clusterName).GlobalRule().Update(ctx, gr, false); err != nil {
				merr = multierror.Append(merr, err)
			}
		}
	}
	if deleted != nil {
		for _, ssl := range deleted.SSLs {
			if err := apisix.Cluster(clusterName).SSL().Delete(ctx, ssl); err != nil {
				merr = multierror.Append(merr, err)
			}
		}
		for _, r := range deleted.Routes {
			if err := apisix.Cluster(clusterName).Route().Delete(ctx, r); err != nil {
				log.Warnw("failed to delete route, this may affect upstream deletions",
					zap.Error(err),
					zap.Any("route", r),
				)
				merr = multierror.Append(merr, err)
			}
		}
		for _, sr := range deleted.StreamRoutes {
			if err := apisix.Cluster(clusterName).StreamRoute().Delete(ctx, sr); err != nil {
				merr = multierror.Append(merr, err)
			}
		}
		for _, u := range deleted.Upstreams {
			if err := apisix.Cluster(clusterName).Upstream().Delete(ctx, u); err != nil {
				// Upstream might be referenced by other routes.
				if err != cache.ErrStillInUse {
					merr = multierror.Append(merr, err)
				} else {
					log.Infow("upstream was referenced by other routes",
						zap.String("upstream_id", u.ID),
						zap.String("upstream_name", u.Name),
					)

					if log.Level() <= zap.DebugLevel {
						// this could also happen when the route is synced(deleted) in another syncManifest call,
						// but arrives later than this
						// So log the deleted routes in this call to see if it's true
						if len(deleted.Routes) == 0 {
							log.Debugw("syncManifest deletes upstream but doesn't delete any routes")
						} else {
							found := false

							for _, r := range deleted.Routes {
								if r.UpstreamId == u.ID {
									found = true
									log.Debugw("a deleted route is referencing upstream",
										zap.Any("route", r),
									)
								}
							}
							if !found {
								log.Debugw("no any deleted route is referencing this upstream",
									zap.String("upstream_id", u.ID),
								)
							}
						}

						// try to find which route is referencing the upstream
						routes, err := apisix.Cluster(clusterName).Route().List(ctx)
						if err != nil {
							log.Debugw("try to find referencing routes, but failed to list",
								zap.Error(err),
							)
						}

						found := false
						for _, r := range routes {
							if r.UpstreamId == u.ID {
								found = true
								log.Debugw("route is referencing upstream",
									zap.Any("route", r),
								)
							}
						}
						if !found {
							log.Debugw("failed to find a route that references the upstream",
								zap.String("upstream_id", u.ID),
								zap.Any("routes", routes),
							)
						}
					}
				}
			}
		}
		for _, pc := range deleted.PluginConfigs {
			if err := apisix.Cluster(clusterName).PluginConfig().Delete(ctx, pc); err != nil {
				// pluginConfig might be referenced by other routes.
				if err != cache.ErrStillInUse {
					merr = multierror.Append(merr, err)
				} else {
					log.Infow("plugin_config was referenced by other routes",
						zap.String("plugin_config_id", pc.ID),
						zap.String("plugin_config_name", pc.Name),
					)
				}
			}
		}
		for _, pm := range deleted.PluginMetadatas {
			if err := apisix.Cluster(clusterName).PluginMetadata().Delete(ctx, pm); err != nil {
				merr = multierror.Append(merr, err)
			}
		}
		for _, gr := range deleted.GlobalRules {
			if err := apisix.Cluster(clusterName).GlobalRule().Delete(ctx, gr); err != nil {
				merr = multierror.Append(merr, err)
			}
		}
	}
	if merr != nil {
		return merr
	}
	return nil
}