pkg/gateway/model_build_targets.go (199 lines of code) (raw):
package gateway
import (
"context"
"errors"
"strconv"
"strings"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
anv1alpha1 "github.com/aws/aws-application-networking-k8s/pkg/apis/applicationnetworking/v1alpha1"
"github.com/aws/aws-application-networking-k8s/pkg/k8s"
"github.com/aws/aws-application-networking-k8s/pkg/model/core"
model "github.com/aws/aws-application-networking-k8s/pkg/model/lattice"
"github.com/aws/aws-application-networking-k8s/pkg/utils/gwlog"
"github.com/aws/aws-sdk-go/aws"
discoveryv1 "k8s.io/api/discovery/v1"
)
const (
portAnnotationsKey = "application-networking.k8s.aws/port"
undefinedPort = int32(0)
)
type LatticeTargetsBuilder interface {
Build(ctx context.Context, service *corev1.Service, backendRef core.BackendRef, stackTgId string) (core.Stack, error)
BuildForServiceExport(ctx context.Context, serviceExport *anv1alpha1.ServiceExport, stackTgId string) (core.Stack, error)
}
type LatticeTargetsModelBuilder struct {
log gwlog.Logger
client client.Client
stack core.Stack
}
func NewTargetsBuilder(
log gwlog.Logger,
client client.Client,
stack core.Stack,
) *LatticeTargetsModelBuilder {
return &LatticeTargetsModelBuilder{
log: log,
client: client,
stack: stack,
}
}
func (b *LatticeTargetsModelBuilder) Build(ctx context.Context, service *corev1.Service,
backendRef core.BackendRef, stackTgId string) (core.Stack, error) {
return b.build(ctx, nil, service, backendRef, b.stack, stackTgId)
}
func (b *LatticeTargetsModelBuilder) BuildForServiceExport(ctx context.Context,
serviceExport *anv1alpha1.ServiceExport, stackTgId string) (core.Stack, error) {
return b.build(ctx, serviceExport, nil, nil, b.stack, stackTgId)
}
func (b *LatticeTargetsModelBuilder) build(ctx context.Context,
serviceExport *anv1alpha1.ServiceExport,
service *corev1.Service, backendRef core.BackendRef,
stack core.Stack, stackTgId string,
) (core.Stack, error) {
isServiceExport := serviceExport != nil
isBackendRef := service != nil && backendRef != nil
if !(isServiceExport || isBackendRef) {
return nil, errors.New("either service export or route/service/backendRef must be specified")
}
if isServiceExport && isBackendRef {
return nil, errors.New("either service export or route/service/backendRef must be specified, but not both")
}
if isServiceExport {
b.log.Debugf(ctx, "Processing targets for service export %s-%s", serviceExport.Name, serviceExport.Namespace)
serviceName := types.NamespacedName{
Namespace: serviceExport.Namespace,
Name: serviceExport.Name,
}
tmpSvc := &corev1.Service{}
if err := b.client.Get(ctx, serviceName, tmpSvc); err != nil {
return nil, err
}
service = tmpSvc
} else {
b.log.Debugf(ctx, "Processing targets for service %s-%s", service.Name, service.Namespace)
}
if stack == nil {
stack = core.NewDefaultStack(core.StackID(k8s.NamespacedName(service)))
}
if !service.DeletionTimestamp.IsZero() {
b.log.Debugf(ctx, "service %s/%s is deleted, skipping target build", service.Name, service.Namespace)
return stack, nil
}
task := &latticeTargetsModelBuildTask{
log: b.log,
client: b.client,
serviceExport: serviceExport,
service: service,
backendRef: backendRef,
stack: stack,
stackTgId: stackTgId,
}
if err := task.run(ctx); err != nil {
return nil, err
}
return task.stack, nil
}
func (t *latticeTargetsModelBuildTask) run(ctx context.Context) error {
return t.buildLatticeTargets(ctx)
}
func (t *latticeTargetsModelBuildTask) buildLatticeTargets(ctx context.Context) error {
definedPorts := t.getDefinedPorts()
// A service port MUST have a name if there are multiple ports exposed from a service.
// Therefore, if a port is named, endpoint port is only relevant if it has the same name.
//
// If a service port is unnamed, it MUST be the only port that is exposed from a service.
// In this case, as long as the service port is matching with backendRef/annotations,
// we can consider all endpoints valid.
servicePortNames := make(map[string]struct{})
skipMatch := false
for _, port := range t.service.Spec.Ports {
if _, ok := definedPorts[port.Port]; ok {
if port.Name != "" {
servicePortNames[port.Name] = struct{}{}
} else {
// Unnamed, consider all endpoints valid
skipMatch = true
}
}
}
// Having no backendRef port makes all endpoints valid - this is mainly for backwards compatibility.
if len(definedPorts) == 0 {
skipMatch = true
}
var targetList []model.Target
if t.service.DeletionTimestamp.IsZero() {
var err error
targetList, err = t.getTargetListFromEndpoints(ctx, servicePortNames, skipMatch)
if err != nil {
return err
}
}
spec := model.TargetsSpec{
StackTargetGroupId: t.stackTgId,
TargetList: targetList,
}
_, err := model.NewTargets(t.stack, spec)
if err != nil {
return err
}
return nil
}
func (t *latticeTargetsModelBuildTask) getTargetListFromEndpoints(ctx context.Context, servicePortNames map[string]struct{}, skipMatch bool) ([]model.Target, error) {
epSlices := &discoveryv1.EndpointSliceList{}
if err := t.client.List(ctx, epSlices,
client.InNamespace(t.service.Namespace),
client.MatchingLabels{discoveryv1.LabelServiceName: t.service.Name}); err != nil {
return nil, err
}
var targetList []model.Target
for _, epSlice := range epSlices.Items {
for _, port := range epSlice.Ports {
// Note that the Endpoint's port name is from ServicePort, but the actual registered port
// is from Pods(targets).
if _, ok := servicePortNames[aws.StringValue(port.Name)]; ok || skipMatch {
for _, ep := range epSlice.Endpoints {
for _, address := range ep.Addresses {
// Do not model terminating endpoints so that they can deregister.
if aws.BoolValue(ep.Conditions.Terminating) {
continue
}
target := model.Target{
TargetIP: address,
Port: int64(aws.Int32Value(port.Port)),
Ready: aws.BoolValue(ep.Conditions.Ready),
}
if ep.TargetRef != nil && ep.TargetRef.Kind == "Pod" {
target.TargetRef = types.NamespacedName{Namespace: ep.TargetRef.Namespace, Name: ep.TargetRef.Name}
}
targetList = append(targetList, target)
}
}
}
}
}
return targetList, nil
}
func (t *latticeTargetsModelBuildTask) getDefinedPorts() map[int32]struct{} {
definedPorts := make(map[int32]struct{})
isServiceExport := t.serviceExport != nil
if isServiceExport {
portsAnnotations := strings.Split(t.serviceExport.Annotations[portAnnotationsKey], ",")
for _, portAnnotation := range portsAnnotations {
if portAnnotation != "" {
definedPort, err := strconv.ParseInt(portAnnotation, 10, 32)
if err != nil {
t.log.Infof(context.TODO(), "failed to read Annotations/Port: %s due to %s",
t.serviceExport.Annotations[portAnnotationsKey], err)
} else {
definedPorts[int32(definedPort)] = struct{}{}
}
}
}
} else if t.backendRef.Port() != nil {
backendRefPort := int32(*t.backendRef.Port())
if backendRefPort != undefinedPort {
definedPorts[backendRefPort] = struct{}{}
}
}
return definedPorts
}
type latticeTargetsModelBuildTask struct {
log gwlog.Logger
client client.Client
serviceExport *anv1alpha1.ServiceExport
service *corev1.Service
backendRef core.BackendRef
stack core.Stack
stackTgId string
}