in pkg/providers/ingress/ingress.go [108:238]
func (c *ingressController) sync(ctx context.Context, ev *types.Event) error {
ingEv := ev.Object.(kube.IngressEvent)
namespace, name, err := cache.SplitMetaNamespaceKey(ingEv.Key)
if err != nil {
log.Errorf("found ingress resource with invalid meta namespace key %s: %s", ingEv.Key, err)
return err
}
var (
ing kube.Ingress
tctx *translation.TranslateContext
)
switch ingEv.GroupVersion {
case kube.IngressV1:
ing, err = c.IngressLister.V1(namespace, name)
case kube.IngressV1beta1:
ing, err = c.IngressLister.V1beta1(namespace, name)
default:
err = fmt.Errorf("unsupported group version %s, one of (%s/%s) is expected", ingEv.GroupVersion,
kube.IngressV1, kube.IngressV1beta1)
}
if err != nil {
if !k8serrors.IsNotFound(err) {
log.Errorf("failed to get ingress %s (group version: %s): %s", ingEv.Key, ingEv.GroupVersion, err)
return err
}
if ev.Type != types.EventDelete {
log.Warnf("ingress %s (group version: %s) was deleted before it can be delivered", ingEv.Key, ingEv.GroupVersion)
// Don't need to retry.
return nil
}
}
if ev.Type == types.EventDelete {
if ing != nil {
// We still find the resource while we are processing the DELETE event,
// that means object with same namespace and name was created, discarding
// this stale DELETE event.
log.Warnf("discard the stale ingress delete event since the %s exists", ingEv.Key)
return nil
}
ing = ev.Tombstone.(kube.Ingress)
}
var secrets []string
switch ingEv.GroupVersion {
case kube.IngressV1:
for _, tls := range ing.V1().Spec.TLS {
secrets = append(secrets, tls.SecretName)
}
case kube.IngressV1beta1:
for _, tls := range ing.V1beta1().Spec.TLS {
secrets = append(secrets, tls.SecretName)
}
}
for _, secret := range secrets {
// We don't support annotation in Ingress
// _caAnnotation = "nginx.ingress.kubernetes.io/auth-tls-secret"
c.storeSecretReference(namespace+"/"+secret, ingEv.Key+"_"+ingEv.GroupVersion, ev.Type)
}
{
if ev.Type == types.EventDelete {
tctx, err = c.translator.TranslateIngressDeleteEvent(ing)
} else {
tctx, err = c.translator.TranslateIngress(ing)
}
if err != nil {
log.Errorw("failed to translate ingress",
zap.Error(err),
zap.Any("ingress", ing),
)
goto updateStatus
}
log.Debugw("translated ingress resource to a couple of routes, upstreams and pluginConfigs",
zap.Any("ingress", ing),
zap.Any("routes", tctx.Routes),
zap.Any("upstreams", tctx.Upstreams),
zap.Any("ssl", tctx.SSL),
zap.Any("pluginConfigs", tctx.PluginConfigs),
)
}
{
m := &utils.Manifest{
SSLs: tctx.SSL,
Routes: tctx.Routes,
Upstreams: tctx.Upstreams,
PluginConfigs: tctx.PluginConfigs,
}
var (
added *utils.Manifest
updated *utils.Manifest
deleted *utils.Manifest
)
if ev.Type == types.EventDelete {
deleted = m
} else if ev.Type.IsAddEvent() {
added = m
} else {
oldCtx, _ := c.translator.TranslateOldIngress(ingEv.OldObject)
om := &utils.Manifest{
Routes: oldCtx.Routes,
Upstreams: oldCtx.Upstreams,
SSLs: oldCtx.SSL,
PluginConfigs: oldCtx.PluginConfigs,
}
added, updated, deleted = m.Diff(om)
}
if err = c.SyncManifests(ctx, added, updated, deleted, ev.Type.IsSyncEvent()); err != nil {
log.Errorw("failed to sync Ingress to apisix",
zap.Error(err),
)
goto updateStatus
}
}
updateStatus:
c.pool.Queue(func(wu pool.WorkUnit) (interface{}, error) {
if wu.IsCancelled() {
return nil, nil
}
c.UpdateStatus(ing)
return true, nil
})
return err
}