func()

in pkg/providers/apisix/apisix_route.go [256:395]


func (c *apisixRouteController) sync(ctx context.Context, ev *types.Event) error {
	obj := ev.Object.(kube.ApisixRouteEvent)
	namespace, name, err := cache.SplitMetaNamespaceKey(obj.Key)
	if err != nil {
		log.Errorf("invalid resource key: %s", obj.Key)
		return err
	}
	var (
		ar   kube.ApisixRoute
		tctx *translation.TranslateContext
	)
	switch obj.GroupVersion {
	case config.ApisixV2:
		ar, err = c.ApisixRouteLister.V2(namespace, name)
	default:
		log.Errorw("unknown ApisixRoute version",
			zap.String("version", obj.GroupVersion),
			zap.String("key", obj.Key),
		)
		return fmt.Errorf("unknown ApisixRoute version %v", obj.GroupVersion)
	}
	if err != nil {
		if !k8serrors.IsNotFound(err) {
			log.Errorw("failed to get ApisixRoute",
				zap.String("version", obj.GroupVersion),
				zap.String("key", obj.Key),
				zap.Error(err),
			)
			return err
		}

		if ev.Type == types.EventSync {
			// ignore not found error in delay sync
			return nil
		}
		if ev.Type != types.EventDelete {
			log.Warnw("ApisixRoute was deleted before it can be delivered",
				zap.String("key", obj.Key),
				zap.String("version", obj.GroupVersion),
			)
			return nil
		}
	}

	// sync before translation
	c.syncRelationship(ev, obj.Key, ar)

	if ev.Type == types.EventDelete {
		if ar != nil {
			// We still find the resource while we are processing the DELETE event,
			// that means object with same namespace and name was created, discarding
			// this stale DELETE event.
			log.Warnw("discard the stale ApisixRoute delete event since the resource still exists",
				zap.String("key", obj.Key),
			)
			return nil
		}
		ar = ev.Tombstone.(kube.ApisixRoute)
	}
	// translator phase: translate resource, construction data plance context
	{
		switch obj.GroupVersion {
		case config.ApisixV2:
			if ev.Type != types.EventDelete {
				if err = c.checkPluginNameIfNotEmptyV2(ctx, ar.V2()); err == nil {
					tctx, err = c.translator.TranslateRouteV2(ar.V2())
				}
			} else {
				tctx, err = c.translator.GenerateRouteV2DeleteMark(ar.V2())
			}
			if err != nil {
				log.Errorw("failed to translate ApisixRoute v2",
					zap.Error(err),
					zap.Any("object", ar),
				)
				goto updateStatus
			}
		}

		log.Debugw("translated ApisixRoute",
			zap.Any("routes", tctx.Routes),
			zap.Any("upstreams", tctx.Upstreams),
			zap.Any("apisix_route", ar),
			zap.Any("pluginConfigs", tctx.PluginConfigs),
		)
	}
	// sync phase: Use context update data palne
	{
		m := &utils.Manifest{
			Routes:        tctx.Routes,
			Upstreams:     tctx.Upstreams,
			StreamRoutes:  tctx.StreamRoutes,
			PluginConfigs: tctx.PluginConfigs,
		}
		var (
			added   *utils.Manifest
			updated *utils.Manifest
			deleted *utils.Manifest
		)

		if ev.Type == types.EventDelete {
			deleted = m
		} else if ev.Type.IsAddEvent() {
			added = m
		} else {
			oldCtx, _ := c.translator.TranslateOldRoute(obj.OldObject)
			if oldCtx != nil {
				om := &utils.Manifest{
					Routes:        oldCtx.Routes,
					Upstreams:     oldCtx.Upstreams,
					StreamRoutes:  oldCtx.StreamRoutes,
					PluginConfigs: oldCtx.PluginConfigs,
				}
				added, updated, deleted = m.Diff(om)
			}
		}

		log.Debugw("sync ApisixRoute to cluster",
			zap.String("event_type", ev.Type.String()),
			zap.Any("add", added),
			zap.Any("update", updated),
			zap.Any("delete", deleted),
		)
		if err = c.SyncManifests(ctx, added, updated, deleted, ev.Type.IsSyncEvent()); err != nil {
			log.Errorw("failed to sync ApisixRoute to apisix",
				zap.Error(err),
			)
			goto updateStatus
		}
	}
updateStatus:
	c.pool.Queue(func(wu pool.WorkUnit) (interface{}, error) {
		if wu.IsCancelled() {
			return nil, nil
		}
		c.updateStatus(ar, err)
		return true, nil
	})
	return err
}