func()

in pkg/providers/apisix/apisix_plugin_config.go [92:223]


func (c *apisixPluginConfigController) sync(ctx context.Context, ev *types.Event) error {
	obj := ev.Object.(kube.ApisixPluginConfigEvent)
	namespace, name, err := cache.SplitMetaNamespaceKey(obj.Key)
	if err != nil {
		log.Errorf("invalid resource key: %s", obj.Key)
		return err
	}
	var (
		apc  kube.ApisixPluginConfig
		tctx *translation.TranslateContext
	)
	switch obj.GroupVersion {
	case config.ApisixV2:
		apc, err = c.ApisixPluginConfigLister.V2(namespace, name)
	default:
		return fmt.Errorf("unsupported ApisixPluginConfig group version %s", obj.GroupVersion)
	}
	if err != nil {
		if !k8serrors.IsNotFound(err) {
			log.Errorw("failed to get ApisixPluginConfig",
				zap.String("version", obj.GroupVersion),
				zap.String("key", obj.Key),
				zap.Error(err),
			)
			return err
		}

		if ev.Type == types.EventSync {
			// ignore not found error in delay sync
			return nil
		}
		if ev.Type != types.EventDelete {
			log.Warnw("ApisixPluginConfig was deleted before it can be delivered",
				zap.String("key", obj.Key),
				zap.String("version", obj.GroupVersion),
			)
			return nil
		}
	}
	if ev.Type == types.EventDelete {
		if apc != nil {
			// We still find the resource while we are processing the DELETE event,
			// that means object with same namespace and name was created, discarding
			// this stale DELETE event.
			log.Warnw("discard the stale ApisixPluginConfig delete event since the resource still exists",
				zap.String("key", obj.Key),
			)
			return nil
		}
		apc = ev.Tombstone.(kube.ApisixPluginConfig)
	}
	// translator phase: translate resource, construction data plance context
	var errRecord error
	{
		switch obj.GroupVersion {
		case config.ApisixV2:
			if ev.Type != types.EventDelete {
				tctx, err = c.translator.TranslatePluginConfigV2(apc.V2())
			} else {
				tctx, err = c.translator.GeneratePluginConfigV2DeleteMark(apc.V2())
			}
			if err != nil {
				log.Errorw("failed to translate ApisixPluginConfig v2",
					zap.Error(err),
					zap.Any("object", apc),
				)
				errRecord = err
				goto updatestatus
			}
		}

	}
	// sync phase: Use context update data palne
	{
		log.Debugw("translated ApisixPluginConfig",
			zap.Any("pluginConfigs", tctx.PluginConfigs),
		)
		m := &utils.Manifest{
			PluginConfigs: tctx.PluginConfigs,
		}

		var (
			added   *utils.Manifest
			updated *utils.Manifest
			deleted *utils.Manifest
		)

		if ev.Type == types.EventDelete {
			deleted = m
		} else if ev.Type.IsAddEvent() {
			added = m
		} else {
			var oldCtx *translation.TranslateContext
			switch obj.GroupVersion {
			case config.ApisixV2:
				oldCtx, err = c.translator.TranslatePluginConfigV2(obj.OldObject.V2())
			}
			if err != nil {
				log.Errorw("failed to translate old ApisixPluginConfig",
					zap.String("version", obj.GroupVersion),
					zap.String("event", "update"),
					zap.Error(err),
					zap.Any("ApisixPluginConfig", apc),
				)
				errRecord = err
				goto updatestatus
			}

			om := &utils.Manifest{
				PluginConfigs: oldCtx.PluginConfigs,
			}
			added, updated, deleted = m.Diff(om)
		}

		if err := c.SyncManifests(ctx, added, updated, deleted, ev.Type.IsSyncEvent()); err != nil {
			log.Errorw("failed to sync ApisixPluginConfig to apisix",
				zap.Error(err),
			)
			errRecord = err
			goto updatestatus
		}
	}
updatestatus:
	c.pool.Queue(func(wu pool.WorkUnit) (interface{}, error) {
		if wu.IsCancelled() {
			return nil, nil
		}
		c.updateStatus(apc, errRecord)
		return true, nil
	})
	return errRecord
}