pkg/gateway/model_build_targetgroup.go (383 lines of code) (raw):
package gateway
import (
"context"
"errors"
"fmt"
"github.com/aws/aws-sdk-go/service/vpclattice"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
anv1alpha1 "github.com/aws/aws-application-networking-k8s/pkg/apis/applicationnetworking/v1alpha1"
"github.com/aws/aws-application-networking-k8s/pkg/config"
"github.com/aws/aws-application-networking-k8s/pkg/k8s"
policy "github.com/aws/aws-application-networking-k8s/pkg/k8s/policyhelper"
"github.com/aws/aws-application-networking-k8s/pkg/model/core"
model "github.com/aws/aws-application-networking-k8s/pkg/model/lattice"
"github.com/aws/aws-application-networking-k8s/pkg/utils/gwlog"
)
type (
TGP = anv1alpha1.TargetGroupPolicy
)
type InvalidBackendRefError struct {
BackendRef core.BackendRef
Reason string
}
func (e *InvalidBackendRefError) Error() string {
return e.Reason
}
//go:generate mockgen -destination model_build_targetgroup_mock.go -package gateway github.com/aws/aws-application-networking-k8s/pkg/gateway SvcExportTargetGroupModelBuilder,BackendRefTargetGroupModelBuilder
type SvcExportTargetGroupModelBuilder interface {
// used during standard model build
Build(ctx context.Context, svcExport *anv1alpha1.ServiceExport) (core.Stack, error)
// used for reconciliation of existing target groups against a service export object
BuildTargetGroup(ctx context.Context, svcExport *anv1alpha1.ServiceExport) (*model.TargetGroup, error)
}
type SvcExportTargetGroupBuilder struct {
log gwlog.Logger
client client.Client
}
func NewSvcExportTargetGroupBuilder(
log gwlog.Logger,
client client.Client,
) *SvcExportTargetGroupBuilder {
return &SvcExportTargetGroupBuilder{
log: log,
client: client,
}
}
type svcExportTargetGroupModelBuildTask struct {
log gwlog.Logger
client client.Client
tgp *policy.PolicyHandler[*TGP]
serviceExport *anv1alpha1.ServiceExport
stack core.Stack
}
func (b *SvcExportTargetGroupBuilder) Build(
ctx context.Context,
svcExport *anv1alpha1.ServiceExport,
) (core.Stack, error) {
stack := core.NewDefaultStack(core.StackID(k8s.NamespacedName(svcExport)))
task := &svcExportTargetGroupModelBuildTask{
log: b.log,
serviceExport: svcExport,
stack: stack,
client: b.client,
tgp: policy.NewTargetGroupPolicyHandler(b.log, b.client),
}
if err := task.run(ctx); err != nil {
return nil, err
}
return task.stack, nil
}
func (b *SvcExportTargetGroupBuilder) BuildTargetGroup(ctx context.Context, svcExport *anv1alpha1.ServiceExport) (*model.TargetGroup, error) {
stack := core.NewDefaultStack(core.StackID(k8s.NamespacedName(svcExport)))
task := &svcExportTargetGroupModelBuildTask{
log: b.log,
serviceExport: svcExport,
stack: stack,
client: b.client,
tgp: policy.NewTargetGroupPolicyHandler(b.log, b.client),
}
return task.buildTargetGroup(ctx)
}
func (t *svcExportTargetGroupModelBuildTask) run(ctx context.Context) error {
tg, err := t.buildTargetGroup(ctx)
if err != nil {
return fmt.Errorf("failed to build target group for service export %s-%s due to %w",
t.serviceExport.Name, t.serviceExport.Namespace, err)
}
if !tg.IsDeleted {
err = t.buildTargets(ctx, tg.ID())
if err != nil {
t.log.Debugf(ctx, "Failed to build targets for service export %s-%s due to %s",
t.serviceExport.Name, t.serviceExport.Namespace, err)
return err
}
}
return nil
}
func (t *svcExportTargetGroupModelBuildTask) buildTargets(ctx context.Context, stackTgId string) error {
targetsBuilder := NewTargetsBuilder(t.log, t.client, t.stack)
_, err := targetsBuilder.BuildForServiceExport(ctx, t.serviceExport, stackTgId)
if err != nil {
return err
}
return nil
}
func (t *svcExportTargetGroupModelBuildTask) buildTargetGroup(ctx context.Context) (*model.TargetGroup, error) {
svc := &corev1.Service{}
noSvcFoundAndDeleting := false
if err := t.client.Get(ctx, k8s.NamespacedName(t.serviceExport), svc); err != nil {
if apierrors.IsNotFound(err) && !t.serviceExport.DeletionTimestamp.IsZero() {
// if we're deleting, it's OK if the service isn't there
noSvcFoundAndDeleting = true
} else { // either it's some other error or we aren't deleting
return nil, fmt.Errorf("failed to find corresponding k8sService %s, error :%w ",
k8s.NamespacedName(t.serviceExport), err)
}
}
var ipAddressType string
var err error
if noSvcFoundAndDeleting {
ipAddressType = "IPV4" // just pick a default
} else {
ipAddressType, err = buildTargetGroupIpAddressType(svc)
if err != nil {
return nil, err
}
}
tgp, err := t.tgp.ObjResolvedPolicy(ctx, t.serviceExport)
if err != nil {
return nil, err
}
protocol, protocolVersion, healthCheckConfig, err := parseTargetGroupConfig(tgp)
if err != nil {
return nil, err
}
spec := model.TargetGroupSpec{
Type: model.TargetGroupTypeIP,
Port: 80,
Protocol: protocol,
ProtocolVersion: protocolVersion,
IpAddressType: ipAddressType,
HealthCheckConfig: healthCheckConfig,
}
spec.VpcId = config.VpcID
spec.K8SSourceType = model.SourceTypeSvcExport
spec.K8SClusterName = config.ClusterName
spec.K8SServiceName = t.serviceExport.Name
spec.K8SServiceNamespace = t.serviceExport.Namespace
spec.K8SProtocolVersion = protocolVersion
stackTG, err := model.NewTargetGroup(t.stack, spec)
if err != nil {
return nil, err
}
stackTG.IsDeleted = !t.serviceExport.DeletionTimestamp.IsZero()
return stackTG, nil
}
type BackendRefTargetGroupModelBuilder interface {
Build(ctx context.Context, route core.Route, backendRef core.BackendRef, stack core.Stack) (core.Stack, *model.TargetGroup, error)
}
type BackendRefTargetGroupBuilder struct {
log gwlog.Logger
client client.Client
}
func NewBackendRefTargetGroupBuilder(log gwlog.Logger, client client.Client) BackendRefTargetGroupModelBuilder {
return &BackendRefTargetGroupBuilder{
log: log,
client: client,
}
}
type backendRefTargetGroupModelBuildTask struct {
log gwlog.Logger
client client.Client
stack core.Stack
route core.Route
backendRef core.BackendRef
tgp *policy.PolicyHandler[*TGP]
}
func (b *BackendRefTargetGroupBuilder) Build(
ctx context.Context,
route core.Route,
backendRef core.BackendRef,
stack core.Stack,
) (core.Stack, *model.TargetGroup, error) {
if stack == nil {
stack = core.NewDefaultStack(core.StackID(k8s.NamespacedName(route.K8sObject())))
b.log.Debugf(ctx, "Creating new stack for build task")
}
task := backendRefTargetGroupModelBuildTask{
log: b.log,
client: b.client,
stack: stack,
route: route,
backendRef: backendRef,
tgp: policy.NewTargetGroupPolicyHandler(b.log, b.client),
}
stackTg, err := task.buildTargetGroup(ctx)
if err != nil {
return nil, nil, err
}
return task.stack, stackTg, nil
}
func (t *backendRefTargetGroupModelBuildTask) buildTargetGroup(ctx context.Context) (*model.TargetGroup, error) {
if string(*t.backendRef.Kind()) == "ServiceImport" {
return nil, errors.New("not supported for ServiceImport BackendRef")
}
tgSpec, err := t.buildTargetGroupSpec(ctx)
if err != nil {
return nil, fmt.Errorf("buildTargetGroupSpec err %w", err)
}
stackTG, err := model.NewTargetGroup(t.stack, tgSpec)
if err != nil {
return nil, err
}
t.log.Debugf(ctx, "Added target group for backendRef %s to the stack %s", t.backendRef.Name(), stackTG.ID())
stackTG.IsDeleted = !t.route.DeletionTimestamp().IsZero() // should always be false
if !stackTG.IsDeleted {
t.buildTargets(ctx, stackTG.ID())
}
return stackTG, nil
}
func (t *backendRefTargetGroupModelBuildTask) buildTargets(ctx context.Context, stackTgId string) error {
if string(*t.backendRef.Kind()) == "ServiceImport" {
t.log.Debugf(ctx, "Service import does not manage targets, returning")
return nil
}
backendRefNsName := getBackendRefNsName(t.route, t.backendRef)
svc := &corev1.Service{}
if err := t.client.Get(ctx, backendRefNsName, svc); err != nil {
return fmt.Errorf("error finding backend service %s due to %s", backendRefNsName, err)
}
targetsBuilder := NewTargetsBuilder(t.log, t.client, t.stack)
_, err := targetsBuilder.Build(ctx, svc, t.backendRef, stackTgId)
if err != nil {
return err
}
return nil
}
// Now, Only k8sService and serviceImport creation deletion use this function to build TargetGroupSpec, serviceExport does not use this function to create TargetGroupSpec
func (t *backendRefTargetGroupModelBuildTask) buildTargetGroupSpec(ctx context.Context) (model.TargetGroupSpec, error) {
// note we only build target groups for backendRefs on non-deleted routes
backendKind := string(*t.backendRef.Kind())
t.log.Debugf(ctx, "buildTargetGroupSpec, kind %s", backendKind)
vpc := config.VpcID
eksCluster := config.ClusterName
backendRefNsName := getBackendRefNsName(t.route, t.backendRef)
svc := &corev1.Service{}
if err := t.client.Get(ctx, backendRefNsName, svc); err != nil {
if apierrors.IsNotFound(err) {
return model.TargetGroupSpec{}, &InvalidBackendRefError{
BackendRef: t.backendRef,
Reason: fmt.Sprintf("service %s on route %s not found, backendRef invalid", backendRefNsName.Name, t.route.Name()),
}
} else {
return model.TargetGroupSpec{},
fmt.Errorf("error finding backend service %s due to %s", backendRefNsName, err)
}
}
var err error
ipAddressType, err := buildTargetGroupIpAddressType(svc)
if err != nil {
return model.TargetGroupSpec{}, err
}
tgp, err := t.tgp.ObjResolvedPolicy(ctx, svc)
if err != nil {
return model.TargetGroupSpec{}, err
}
protocol, protocolVersion, healthCheckConfig, err := parseTargetGroupConfig(tgp)
if err != nil {
return model.TargetGroupSpec{}, err
}
var parentRefType model.K8SSourceType
switch t.route.(type) {
case *core.HTTPRoute:
parentRefType = model.SourceTypeHTTPRoute
case *core.GRPCRoute:
// protocolVersion:GRPC takes precedence over other protocolVersions for k8s svc backendref by GRPCRoutes
protocolVersion = vpclattice.TargetGroupProtocolVersionGrpc
parentRefType = model.SourceTypeGRPCRoute
case *core.TLSRoute:
// protocol:TCP takes precedence over other protocol for k8s svc backendref by TLSRoutes
protocol = vpclattice.TargetGroupProtocolTcp
protocolVersion = ""
parentRefType = model.SourceTypeTLSRoute
default:
return model.TargetGroupSpec{}, fmt.Errorf("unsupported route type %T", t.route)
}
spec := model.TargetGroupSpec{
Type: model.TargetGroupTypeIP,
Port: 80,
Protocol: protocol,
ProtocolVersion: protocolVersion,
IpAddressType: ipAddressType,
HealthCheckConfig: healthCheckConfig,
}
spec.VpcId = vpc
spec.K8SSourceType = parentRefType
spec.K8SClusterName = eksCluster
spec.K8SServiceName = backendRefNsName.Name
spec.K8SServiceNamespace = backendRefNsName.Namespace
spec.K8SRouteName = t.route.Name()
spec.K8SRouteNamespace = t.route.Namespace()
spec.K8SProtocolVersion = protocolVersion
return spec, nil
}
func getBackendRefNsName(route core.Route, backendRef core.BackendRef) types.NamespacedName {
var namespace = route.Namespace()
if backendRef.Namespace() != nil {
namespace = string(*backendRef.Namespace())
}
backendRefNsName := types.NamespacedName{
Namespace: namespace,
Name: string(backendRef.Name()),
}
return backendRefNsName
}
func parseTargetGroupConfig(tgp *anv1alpha1.TargetGroupPolicy) (
protocol string, protocolVersion string, healthCheckConfig *vpclattice.HealthCheckConfig, err error) {
protocol = "HTTP"
protocolVersion = vpclattice.TargetGroupProtocolVersionHttp1
if tgp == nil {
return protocol, protocolVersion, nil, nil
}
if tgp.Spec.Protocol != nil && *tgp.Spec.Protocol == vpclattice.TargetGroupProtocolTcp {
if tgp.Spec.ProtocolVersion != nil {
return "", "", nil, fmt.Errorf("protocolVersion is not supported for TCP protocol TargetGroupPolicy")
}
protocolVersion = ""
}
// Override protocol if specified in the TargetGroupPolicy
if tgp.Spec.Protocol != nil {
protocol = *tgp.Spec.Protocol
}
// Override protocolVersion if specified in the TargetGroupPolicy for non-TCP protocol
if tgp.Spec.ProtocolVersion != nil && protocol != vpclattice.TargetGroupProtocolTcp {
protocolVersion = *tgp.Spec.ProtocolVersion
}
healthCheckConfig = parseHealthCheckConfig(tgp)
return protocol, protocolVersion, healthCheckConfig, nil
}
func parseHealthCheckConfig(tgp *anv1alpha1.TargetGroupPolicy) *vpclattice.HealthCheckConfig {
hc := tgp.Spec.HealthCheck
if hc == nil {
return nil
}
var matcher *vpclattice.Matcher
if hc.StatusMatch != nil {
matcher = &vpclattice.Matcher{HttpCode: hc.StatusMatch}
}
return &vpclattice.HealthCheckConfig{
Enabled: hc.Enabled,
HealthCheckIntervalSeconds: hc.IntervalSeconds,
HealthCheckTimeoutSeconds: hc.TimeoutSeconds,
HealthyThresholdCount: hc.HealthyThresholdCount,
UnhealthyThresholdCount: hc.UnhealthyThresholdCount,
Matcher: matcher,
Path: hc.Path,
Port: hc.Port,
Protocol: (*string)(hc.Protocol),
ProtocolVersion: (*string)(hc.ProtocolVersion),
}
}
func buildTargetGroupIpAddressType(svc *corev1.Service) (string, error) {
ipFamilies := svc.Spec.IPFamilies
if len(ipFamilies) != 1 {
return "", errors.New("lattice Target Group only supports single stack IP addresses")
}
// IpFamilies will always have at least 1 element
ipFamily := ipFamilies[0]
switch ipFamily {
case corev1.IPv4Protocol:
return vpclattice.IpAddressTypeIpv4, nil
case corev1.IPv6Protocol:
return vpclattice.IpAddressTypeIpv6, nil
default:
return "", fmt.Errorf("unknown ipFamily: %s", ipFamily)
}
}
func GetServiceForBackendRef(ctx context.Context, client client.Client, route core.Route, backendRef core.BackendRef) (*corev1.Service, error) {
svc := &corev1.Service{}
key := types.NamespacedName{
Name: string(backendRef.Name()),
}
if backendRef.Namespace() != nil {
key.Namespace = string(*backendRef.Namespace())
} else {
key.Namespace = route.Namespace()
}
if err := client.Get(ctx, key, svc); err != nil {
return nil, err
}
return svc, nil
}