in pkg/providers/apisix/apisix_plugin_config.go [92:223]
func (c *apisixPluginConfigController) sync(ctx context.Context, ev *types.Event) error {
obj := ev.Object.(kube.ApisixPluginConfigEvent)
namespace, name, err := cache.SplitMetaNamespaceKey(obj.Key)
if err != nil {
log.Errorf("invalid resource key: %s", obj.Key)
return err
}
var (
apc kube.ApisixPluginConfig
tctx *translation.TranslateContext
)
switch obj.GroupVersion {
case config.ApisixV2:
apc, err = c.ApisixPluginConfigLister.V2(namespace, name)
default:
return fmt.Errorf("unsupported ApisixPluginConfig group version %s", obj.GroupVersion)
}
if err != nil {
if !k8serrors.IsNotFound(err) {
log.Errorw("failed to get ApisixPluginConfig",
zap.String("version", obj.GroupVersion),
zap.String("key", obj.Key),
zap.Error(err),
)
return err
}
if ev.Type == types.EventSync {
// ignore not found error in delay sync
return nil
}
if ev.Type != types.EventDelete {
log.Warnw("ApisixPluginConfig was deleted before it can be delivered",
zap.String("key", obj.Key),
zap.String("version", obj.GroupVersion),
)
return nil
}
}
if ev.Type == types.EventDelete {
if apc != nil {
// We still find the resource while we are processing the DELETE event,
// that means object with same namespace and name was created, discarding
// this stale DELETE event.
log.Warnw("discard the stale ApisixPluginConfig delete event since the resource still exists",
zap.String("key", obj.Key),
)
return nil
}
apc = ev.Tombstone.(kube.ApisixPluginConfig)
}
// translator phase: translate resource, construction data plance context
var errRecord error
{
switch obj.GroupVersion {
case config.ApisixV2:
if ev.Type != types.EventDelete {
tctx, err = c.translator.TranslatePluginConfigV2(apc.V2())
} else {
tctx, err = c.translator.GeneratePluginConfigV2DeleteMark(apc.V2())
}
if err != nil {
log.Errorw("failed to translate ApisixPluginConfig v2",
zap.Error(err),
zap.Any("object", apc),
)
errRecord = err
goto updatestatus
}
}
}
// sync phase: Use context update data palne
{
log.Debugw("translated ApisixPluginConfig",
zap.Any("pluginConfigs", tctx.PluginConfigs),
)
m := &utils.Manifest{
PluginConfigs: tctx.PluginConfigs,
}
var (
added *utils.Manifest
updated *utils.Manifest
deleted *utils.Manifest
)
if ev.Type == types.EventDelete {
deleted = m
} else if ev.Type.IsAddEvent() {
added = m
} else {
var oldCtx *translation.TranslateContext
switch obj.GroupVersion {
case config.ApisixV2:
oldCtx, err = c.translator.TranslatePluginConfigV2(obj.OldObject.V2())
}
if err != nil {
log.Errorw("failed to translate old ApisixPluginConfig",
zap.String("version", obj.GroupVersion),
zap.String("event", "update"),
zap.Error(err),
zap.Any("ApisixPluginConfig", apc),
)
errRecord = err
goto updatestatus
}
om := &utils.Manifest{
PluginConfigs: oldCtx.PluginConfigs,
}
added, updated, deleted = m.Diff(om)
}
if err := c.SyncManifests(ctx, added, updated, deleted, ev.Type.IsSyncEvent()); err != nil {
log.Errorw("failed to sync ApisixPluginConfig to apisix",
zap.Error(err),
)
errRecord = err
goto updatestatus
}
}
updatestatus:
c.pool.Queue(func(wu pool.WorkUnit) (interface{}, error) {
if wu.IsCancelled() {
return nil, nil
}
c.updateStatus(apc, errRecord)
return true, nil
})
return errRecord
}