func()

in pkg/providers/apisix/apisix_upstream.go [134:280]


func (c *apisixUpstreamController) sync(ctx context.Context, ev *types.Event) error {
	event := ev.Object.(kube.ApisixUpstreamEvent)
	key := event.Key
	namespace, name, err := cache.SplitMetaNamespaceKey(key)
	if err != nil {
		log.Errorf("found ApisixUpstream resource with invalid meta namespace key %s: %s", key, err)
		return err
	}

	var multiVersioned kube.ApisixUpstream
	switch event.GroupVersion {
	case config.ApisixV2:
		multiVersioned, err = c.ApisixUpstreamLister.V2(namespace, name)
	default:
		return fmt.Errorf("unsupported ApisixUpstream group version %s", event.GroupVersion)
	}

	if err != nil {
		if !k8serrors.IsNotFound(err) {
			log.Errorw("failed to get ApisixUpstream",
				zap.Error(err),
				zap.String("key", key),
				zap.String("version", event.GroupVersion),
			)
			return err
		}
		if ev.Type == types.EventSync {
			// ignore not found error in delay sync
			return nil
		}
		if ev.Type != types.EventDelete {
			log.Warnw("ApisixUpstream was deleted before it can be delivered",
				zap.String("key", key),
				zap.String("version", event.GroupVersion),
			)
			// Don't need to retry.
			return nil
		}
	}
	if ev.Type == types.EventDelete {
		if multiVersioned != nil {
			// We still find the resource while we are processing the DELETE event,
			// that means object with same namespace and name was created, discarding
			// this stale DELETE event.
			log.Warnf("discard the stale ApisixUpstream delete event since the %s exists", key)
			return nil
		}
		multiVersioned = ev.Tombstone.(kube.ApisixUpstream)
	}

	c.syncRelationship(ev, key, multiVersioned)

	var errRecord error
	switch event.GroupVersion {
	case config.ApisixV2:
		au := multiVersioned.V2()
		if au.Spec == nil {
			return nil
		}

		// We will prioritize ExternalNodes and Discovery.
		if len(au.Spec.ExternalNodes) != 0 || au.Spec.Discovery != nil {
			var newUps *apisixv1.Upstream
			if ev.Type != types.EventDelete {
				cfg := &au.Spec.ApisixUpstreamConfig
				newUps, err = c.translator.TranslateUpstreamConfigV2(cfg)
				if err != nil {
					log.Errorw("failed to translate upstream config",
						zap.Any("object", au),
						zap.Error(err),
					)
					errRecord = err
					goto updateStatus
				}
			}

			if len(au.Spec.ExternalNodes) != 0 {
				errRecord = c.updateExternalNodes(ctx, au, nil, newUps, au.Namespace, au.Name, ev.Type.IsSyncEvent())
				goto updateStatus
			}

			// for service discovery related configuration
			if au.Spec.Discovery.ServiceName == "" || au.Spec.Discovery.Type == "" {
				log.Error("If you setup Discovery for ApisixUpstream, you need to specify the ServiceName and Type fields.")
				errRecord = fmt.Errorf("No ServiceName or Type fields found")
				goto updateStatus
			}
			// updateUpstream for real
			upsName := apisixv1.ComposeExternalUpstreamName(au.Namespace, au.Name)
			errRecord = c.updateUpstream(ctx, upsName, &au.Spec.ApisixUpstreamConfig, ev.Type.IsSyncEvent())
			goto updateStatus
		}

		var portLevelSettings map[int32]configv2.ApisixUpstreamConfig
		if len(au.Spec.PortLevelSettings) > 0 {
			portLevelSettings = make(map[int32]configv2.ApisixUpstreamConfig, len(au.Spec.PortLevelSettings))
			for _, port := range au.Spec.PortLevelSettings {
				portLevelSettings[port.Port] = port.ApisixUpstreamConfig
			}
		}

		svc, err := c.SvcLister.Services(namespace).Get(name)
		if err != nil {
			log.Errorf("failed to get service %s: %s", key, err)
			errRecord = err
			goto updateStatus
		}

		var subsets []configv2.ApisixUpstreamSubset
		subsets = append(subsets, configv2.ApisixUpstreamSubset{})
		if len(au.Spec.Subsets) > 0 {
			subsets = append(subsets, au.Spec.Subsets...)
		}
		for _, port := range svc.Spec.Ports {
			for _, subset := range subsets {
				var cfg configv2.ApisixUpstreamConfig
				if ev.Type != types.EventDelete {
					var ok bool
					cfg, ok = portLevelSettings[port.Port]
					if !ok {
						cfg = au.Spec.ApisixUpstreamConfig
					}
				}

				err := c.updateUpstream(ctx, apisixv1.ComposeUpstreamName(namespace, name, subset.Name, port.Port, types.ResolveGranularity.Endpoint), &cfg, ev.Type.IsSyncEvent())
				if err != nil {
					errRecord = err
					goto updateStatus
				}
				err = c.updateUpstream(ctx, apisixv1.ComposeUpstreamName(namespace, name, subset.Name, port.Port, types.ResolveGranularity.Service), &cfg, ev.Type.IsSyncEvent())
				if err != nil {
					errRecord = err
					goto updateStatus
				}
			}
		}
	}
updateStatus:
	c.pool.Queue(func(wu pool.WorkUnit) (interface{}, error) {
		if wu.IsCancelled() {
			return nil, nil
		}
		c.updateStatus(multiVersioned, errRecord)
		return true, nil
	})
	return errRecord
}