pkg/providers/k8s/endpoint/base.go (100 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package endpoint
import (
"context"
"fmt"
"go.uber.org/zap"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
listerscorev1 "k8s.io/client-go/listers/core/v1"
"github.com/apache/apisix-ingress-controller/pkg/config"
"github.com/apache/apisix-ingress-controller/pkg/kube"
configv2 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v2"
"github.com/apache/apisix-ingress-controller/pkg/log"
"github.com/apache/apisix-ingress-controller/pkg/providers/translation"
providertypes "github.com/apache/apisix-ingress-controller/pkg/providers/types"
"github.com/apache/apisix-ingress-controller/pkg/providers/utils"
"github.com/apache/apisix-ingress-controller/pkg/types"
apisixv1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
)
type baseEndpointController struct {
*providertypes.Common
translator translation.Translator
apisixUpstreamLister kube.ApisixUpstreamLister
svcLister listerscorev1.ServiceLister
}
func (c *baseEndpointController) syncEndpoint(ctx context.Context, ep kube.Endpoint) error {
log.Debugw("endpoint controller syncing endpoint",
zap.Any("endpoint", ep),
)
namespace, err := ep.Namespace()
if err != nil {
return err
}
svcName := ep.ServiceName()
svc, err := c.svcLister.Services(namespace).Get(svcName)
if err != nil {
if k8serrors.IsNotFound(err) {
return c.syncEmptyEndpoint(ctx, ep)
}
log.Errorf("failed to get service %s/%s: %s", namespace, svcName, err)
return err
}
switch c.Kubernetes.APIVersion {
case config.ApisixV2:
var subsets []configv2.ApisixUpstreamSubset
subsets = append(subsets, configv2.ApisixUpstreamSubset{})
auKube, err := c.apisixUpstreamLister.V2(namespace, svcName)
if auKube != nil && auKube.V2().Spec != nil &&
!utils.MatchCRDsIngressClass(auKube.V2().Spec.IngressClassName, c.Kubernetes.IngressClass) {
auKube = nil
}
if err != nil {
if !k8serrors.IsNotFound(err) {
log.Errorf("failed to get ApisixUpstream %s/%s: %s", namespace, svcName, err)
return err
}
} else if auKube != nil && auKube.V2().Spec != nil && len(auKube.V2().Spec.Subsets) > 0 {
subsets = append(subsets, auKube.V2().Spec.Subsets...)
}
clusters := c.APISIX.ListClusters()
for _, port := range svc.Spec.Ports {
for _, subset := range subsets {
nodes, err := c.translator.TranslateEndpoint(ep, port.Port, subset.Labels)
if err != nil {
log.Errorw("failed to translate upstream nodes",
zap.Error(err),
zap.Any("endpoints", ep),
zap.Int32("port", port.Port),
)
}
name := apisixv1.ComposeUpstreamName(namespace, svcName, subset.Name, port.Port, types.ResolveGranularity.Endpoint)
for _, cluster := range clusters {
if err := c.SyncUpstreamNodesChangeToCluster(ctx, cluster, nodes, name); err != nil {
return err
}
}
}
}
default:
panic(fmt.Errorf("unsupported ApisixUpstream version %v", c.Kubernetes.APIVersion))
}
return nil
}
func (c *baseEndpointController) syncEmptyEndpoint(ctx context.Context, ep kube.Endpoint) error {
namespace, err := ep.Namespace()
if err != nil {
return err
}
svcName := ep.ServiceName()
log.Debugw("The service has been deleted, try to delete upstream relation",
zap.String("namespace", namespace),
zap.String("service_name", svcName),
)
clusterName := c.Config.APISIX.DefaultClusterName
err = c.APISIX.Cluster(clusterName).UpstreamServiceRelation().Delete(ctx, namespace+"_"+svcName)
if err != nil {
log.Errorw("delete upstream relation failed",
zap.Error(err),
)
}
return nil
}