func()

in pkg/providers/k8s/endpoint/base.go [46:105]


func (c *baseEndpointController) syncEndpoint(ctx context.Context, ep kube.Endpoint) error {
	log.Debugw("endpoint controller syncing endpoint",
		zap.Any("endpoint", ep),
	)

	namespace, err := ep.Namespace()
	if err != nil {
		return err
	}
	svcName := ep.ServiceName()
	svc, err := c.svcLister.Services(namespace).Get(svcName)
	if err != nil {
		if k8serrors.IsNotFound(err) {
			return c.syncEmptyEndpoint(ctx, ep)
		}
		log.Errorf("failed to get service %s/%s: %s", namespace, svcName, err)
		return err
	}

	switch c.Kubernetes.APIVersion {
	case config.ApisixV2:
		var subsets []configv2.ApisixUpstreamSubset
		subsets = append(subsets, configv2.ApisixUpstreamSubset{})
		auKube, err := c.apisixUpstreamLister.V2(namespace, svcName)
		if auKube != nil && auKube.V2().Spec != nil &&
			!utils.MatchCRDsIngressClass(auKube.V2().Spec.IngressClassName, c.Kubernetes.IngressClass) {
			auKube = nil
		}
		if err != nil {
			if !k8serrors.IsNotFound(err) {
				log.Errorf("failed to get ApisixUpstream %s/%s: %s", namespace, svcName, err)
				return err
			}
		} else if auKube != nil && auKube.V2().Spec != nil && len(auKube.V2().Spec.Subsets) > 0 {
			subsets = append(subsets, auKube.V2().Spec.Subsets...)
		}
		clusters := c.APISIX.ListClusters()
		for _, port := range svc.Spec.Ports {
			for _, subset := range subsets {
				nodes, err := c.translator.TranslateEndpoint(ep, port.Port, subset.Labels)
				if err != nil {
					log.Errorw("failed to translate upstream nodes",
						zap.Error(err),
						zap.Any("endpoints", ep),
						zap.Int32("port", port.Port),
					)
				}
				name := apisixv1.ComposeUpstreamName(namespace, svcName, subset.Name, port.Port, types.ResolveGranularity.Endpoint)
				for _, cluster := range clusters {
					if err := c.SyncUpstreamNodesChangeToCluster(ctx, cluster, nodes, name); err != nil {
						return err
					}
				}
			}
		}
	default:
		panic(fmt.Errorf("unsupported ApisixUpstream version %v", c.Kubernetes.APIVersion))
	}
	return nil
}