in pkg/providers/apisix/apisix_consumer.go [91:178]
func (c *apisixConsumerController) sync(ctx context.Context, ev *types.Event) error {
event := ev.Object.(kube.ApisixConsumerEvent)
key := event.Key
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
log.Errorf("found ApisixConsumer resource with invalid meta namespace key %s: %s", key, err)
return err
}
var multiVersioned kube.ApisixConsumer
switch event.GroupVersion {
case config.ApisixV2:
multiVersioned, err = c.ApisixConsumerLister.V2(namespace, name)
default:
return fmt.Errorf("unsupported ApisixConsumer group version %s", event.GroupVersion)
}
if err != nil {
if !k8serrors.IsNotFound(err) {
log.Errorw("failed to get ApisixConsumer",
zap.Error(err),
zap.String("key", key),
zap.String("version", event.GroupVersion),
)
return err
}
if ev.Type == types.EventSync {
// ignore not found error in delay sync
return nil
}
if ev.Type != types.EventDelete {
log.Warnw("ApisixConsumer was deleted before it can be delivered",
zap.String("key", key),
zap.String("version", event.GroupVersion),
)
// Don't need to retry.
return nil
}
}
if ev.Type == types.EventDelete {
if multiVersioned != nil {
// We still find the resource while we are processing the DELETE event,
// that means object with same namespace and name was created, discarding
// this stale DELETE event.
log.Warnf("discard the stale ApisixConsumer delete event since the %s exists", key)
return nil
}
multiVersioned = ev.Tombstone.(kube.ApisixConsumer)
}
var errRecord error
switch event.GroupVersion {
case config.ApisixV2:
ac := multiVersioned.V2()
consumer, err := c.translator.TranslateApisixConsumerV2(ac)
if err != nil {
log.Errorw("failed to translate ApisixConsumer",
zap.Error(err),
zap.Any("ApisixConsumer", ac),
)
errRecord = err
goto updateStatus
}
log.Debugw("got consumer object from ApisixConsumer",
zap.Any("consumer", consumer),
zap.Any("ApisixConsumer", ac),
)
if err := c.SyncConsumer(ctx, consumer, ev.Type); err != nil {
log.Errorw("failed to sync Consumer to APISIX",
zap.Error(err),
zap.Any("consumer", consumer),
)
errRecord = err
goto updateStatus
}
}
updateStatus:
c.pool.Queue(func(wu pool.WorkUnit) (interface{}, error) {
if wu.IsCancelled() {
return nil, nil
}
c.updateStatus(multiVersioned, errRecord)
return true, nil
})
return errRecord
}