in pkg/providers/apisix/apisix_route.go [255:386]
func (c *apisixRouteController) sync(ctx context.Context, ev *types.Event) error {
obj := ev.Object.(kube.ApisixRouteEvent)
namespace, name, err := cache.SplitMetaNamespaceKey(obj.Key)
if err != nil {
log.Errorf("invalid resource key: %s", obj.Key)
return err
}
var (
ar kube.ApisixRoute
tctx *translation.TranslateContext
)
switch obj.GroupVersion {
case config.ApisixV2:
ar, err = c.ApisixRouteLister.V2(namespace, name)
default:
log.Errorw("unknown ApisixRoute version",
zap.String("version", obj.GroupVersion),
zap.String("key", obj.Key),
)
return fmt.Errorf("unknown ApisixRoute version %v", obj.GroupVersion)
}
if err != nil {
if !k8serrors.IsNotFound(err) {
log.Errorw("failed to get ApisixRoute",
zap.String("version", obj.GroupVersion),
zap.String("key", obj.Key),
zap.Error(err),
)
return err
}
if ev.Type == types.EventSync {
// ignore not found error in delay sync
return nil
}
if ev.Type != types.EventDelete {
log.Warnw("ApisixRoute was deleted before it can be delivered",
zap.String("key", obj.Key),
zap.String("version", obj.GroupVersion),
)
return nil
}
}
// sync before translation
c.syncRelationship(ev, obj.Key, ar)
if ev.Type == types.EventDelete {
if ar != nil {
// We still find the resource while we are processing the DELETE event,
// that means object with same namespace and name was created, discarding
// this stale DELETE event.
log.Warnw("discard the stale ApisixRoute delete event since the resource still exists",
zap.String("key", obj.Key),
)
return nil
}
ar = ev.Tombstone.(kube.ApisixRoute)
}
// translator phase: translate resource, construction data plance context
{
switch obj.GroupVersion {
case config.ApisixV2:
if ev.Type != types.EventDelete {
if err = c.checkPluginNameIfNotEmptyV2(ctx, ar.V2()); err == nil {
tctx, err = c.translator.TranslateRouteV2(ar.V2())
}
} else {
tctx, err = c.translator.GenerateRouteV2DeleteMark(ar.V2())
}
if err != nil {
log.Errorw("failed to translate ApisixRoute v2",
zap.Error(err),
zap.Any("object", ar),
)
goto updateStatus
}
}
log.Debugw("translated ApisixRoute",
zap.Any("routes", tctx.Routes),
zap.Any("upstreams", tctx.Upstreams),
zap.Any("apisix_route", ar),
zap.Any("pluginConfigs", tctx.PluginConfigs),
)
}
// sync phase: Use context update data palne
{
m := &utils.Manifest{
Routes: tctx.Routes,
Upstreams: tctx.Upstreams,
StreamRoutes: tctx.StreamRoutes,
PluginConfigs: tctx.PluginConfigs,
}
var (
added *utils.Manifest
updated *utils.Manifest
deleted *utils.Manifest
)
if ev.Type == types.EventDelete {
deleted = m
} else if ev.Type.IsAddEvent() {
added = m
} else {
oldCtx, _ := c.translator.TranslateOldRoute(obj.OldObject)
om := &utils.Manifest{
Routes: oldCtx.Routes,
Upstreams: oldCtx.Upstreams,
StreamRoutes: oldCtx.StreamRoutes,
PluginConfigs: oldCtx.PluginConfigs,
}
added, updated, deleted = m.Diff(om)
}
if err = c.SyncManifests(ctx, added, updated, deleted, ev.Type.IsSyncEvent()); err != nil {
log.Errorw("failed to sync ApisixRoute to apisix",
zap.Error(err),
)
goto updateStatus
}
}
updateStatus:
c.pool.Queue(func(wu pool.WorkUnit) (interface{}, error) {
if wu.IsCancelled() {
return nil, nil
}
c.updateStatus(ar, err)
return true, nil
})
return err
}