in pkg/providers/k8s/endpoint/base.go [46:105]
func (c *baseEndpointController) syncEndpoint(ctx context.Context, ep kube.Endpoint) error {
log.Debugw("endpoint controller syncing endpoint",
zap.Any("endpoint", ep),
)
namespace, err := ep.Namespace()
if err != nil {
return err
}
svcName := ep.ServiceName()
svc, err := c.svcLister.Services(namespace).Get(svcName)
if err != nil {
if k8serrors.IsNotFound(err) {
return c.syncEmptyEndpoint(ctx, ep)
}
log.Errorf("failed to get service %s/%s: %s", namespace, svcName, err)
return err
}
switch c.Kubernetes.APIVersion {
case config.ApisixV2:
var subsets []configv2.ApisixUpstreamSubset
subsets = append(subsets, configv2.ApisixUpstreamSubset{})
auKube, err := c.apisixUpstreamLister.V2(namespace, svcName)
if auKube != nil && auKube.V2().Spec != nil &&
!utils.MatchCRDsIngressClass(auKube.V2().Spec.IngressClassName, c.Kubernetes.IngressClass) {
auKube = nil
}
if err != nil {
if !k8serrors.IsNotFound(err) {
log.Errorf("failed to get ApisixUpstream %s/%s: %s", namespace, svcName, err)
return err
}
} else if auKube != nil && auKube.V2().Spec != nil && len(auKube.V2().Spec.Subsets) > 0 {
subsets = append(subsets, auKube.V2().Spec.Subsets...)
}
clusters := c.APISIX.ListClusters()
for _, port := range svc.Spec.Ports {
for _, subset := range subsets {
nodes, err := c.translator.TranslateEndpoint(ep, port.Port, subset.Labels)
if err != nil {
log.Errorw("failed to translate upstream nodes",
zap.Error(err),
zap.Any("endpoints", ep),
zap.Int32("port", port.Port),
)
}
name := apisixv1.ComposeUpstreamName(namespace, svcName, subset.Name, port.Port, types.ResolveGranularity.Endpoint)
for _, cluster := range clusters {
if err := c.SyncUpstreamNodesChangeToCluster(ctx, cluster, nodes, name); err != nil {
return err
}
}
}
}
default:
panic(fmt.Errorf("unsupported ApisixUpstream version %v", c.Kubernetes.APIVersion))
}
return nil
}