func()

in pkg/providers/ingress/ingress.go [108:238]


func (c *ingressController) sync(ctx context.Context, ev *types.Event) error {
	ingEv := ev.Object.(kube.IngressEvent)
	namespace, name, err := cache.SplitMetaNamespaceKey(ingEv.Key)
	if err != nil {
		log.Errorf("found ingress resource with invalid meta namespace key %s: %s", ingEv.Key, err)
		return err
	}

	var (
		ing  kube.Ingress
		tctx *translation.TranslateContext
	)
	switch ingEv.GroupVersion {
	case kube.IngressV1:
		ing, err = c.IngressLister.V1(namespace, name)
	case kube.IngressV1beta1:
		ing, err = c.IngressLister.V1beta1(namespace, name)
	default:
		err = fmt.Errorf("unsupported group version %s, one of (%s/%s) is expected", ingEv.GroupVersion,
			kube.IngressV1, kube.IngressV1beta1)
	}

	if err != nil {
		if !k8serrors.IsNotFound(err) {
			log.Errorf("failed to get ingress %s (group version: %s): %s", ingEv.Key, ingEv.GroupVersion, err)
			return err
		}

		if ev.Type != types.EventDelete {
			log.Warnf("ingress %s (group version: %s) was deleted before it can be delivered", ingEv.Key, ingEv.GroupVersion)
			// Don't need to retry.
			return nil
		}
	}

	if ev.Type == types.EventDelete {
		if ing != nil {
			// We still find the resource while we are processing the DELETE event,
			// that means object with same namespace and name was created, discarding
			// this stale DELETE event.
			log.Warnf("discard the stale ingress delete event since the %s exists", ingEv.Key)
			return nil
		}
		ing = ev.Tombstone.(kube.Ingress)
	}

	var secrets []string
	switch ingEv.GroupVersion {
	case kube.IngressV1:
		for _, tls := range ing.V1().Spec.TLS {
			secrets = append(secrets, tls.SecretName)
		}
	case kube.IngressV1beta1:
		for _, tls := range ing.V1beta1().Spec.TLS {
			secrets = append(secrets, tls.SecretName)
		}
	}

	for _, secret := range secrets {
		// We don't support annotation in Ingress
		// 	_caAnnotation = "nginx.ingress.kubernetes.io/auth-tls-secret"
		c.storeSecretReference(namespace+"/"+secret, ingEv.Key+"_"+ingEv.GroupVersion, ev.Type)
	}

	{
		if ev.Type == types.EventDelete {
			tctx, err = c.translator.TranslateIngressDeleteEvent(ing)
		} else {
			tctx, err = c.translator.TranslateIngress(ing)
		}
		if err != nil {
			log.Errorw("failed to translate ingress",
				zap.Error(err),
				zap.Any("ingress", ing),
			)
			goto updateStatus
		}

		log.Debugw("translated ingress resource to a couple of routes, upstreams and pluginConfigs",
			zap.Any("ingress", ing),
			zap.Any("routes", tctx.Routes),
			zap.Any("upstreams", tctx.Upstreams),
			zap.Any("ssl", tctx.SSL),
			zap.Any("pluginConfigs", tctx.PluginConfigs),
		)
	}
	{
		m := &utils.Manifest{
			SSLs:          tctx.SSL,
			Routes:        tctx.Routes,
			Upstreams:     tctx.Upstreams,
			PluginConfigs: tctx.PluginConfigs,
		}

		var (
			added   *utils.Manifest
			updated *utils.Manifest
			deleted *utils.Manifest
		)

		if ev.Type == types.EventDelete {
			deleted = m
		} else if ev.Type.IsAddEvent() {
			added = m
		} else {
			oldCtx, _ := c.translator.TranslateOldIngress(ingEv.OldObject)
			om := &utils.Manifest{
				Routes:        oldCtx.Routes,
				Upstreams:     oldCtx.Upstreams,
				SSLs:          oldCtx.SSL,
				PluginConfigs: oldCtx.PluginConfigs,
			}
			added, updated, deleted = m.Diff(om)
		}
		if err = c.SyncManifests(ctx, added, updated, deleted, ev.Type.IsSyncEvent()); err != nil {
			log.Errorw("failed to sync Ingress to apisix",
				zap.Error(err),
			)
			goto updateStatus
		}
	}
updateStatus:
	c.pool.Queue(func(wu pool.WorkUnit) (interface{}, error) {
		if wu.IsCancelled() {
			return nil, nil
		}
		c.UpdateStatus(ing)
		return true, nil
	})
	return err
}