pkg/deploy/lattice/target_group_synthesizer.go (299 lines of code) (raw):

package lattice import ( "context" "errors" "fmt" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/vpclattice" apierrors "k8s.io/apimachinery/pkg/api/errors" anv1alpha1 "github.com/aws/aws-application-networking-k8s/pkg/apis/applicationnetworking/v1alpha1" "github.com/aws/aws-application-networking-k8s/pkg/gateway" "github.com/aws/aws-application-networking-k8s/pkg/utils/gwlog" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" pkg_aws "github.com/aws/aws-application-networking-k8s/pkg/aws" "github.com/aws/aws-application-networking-k8s/pkg/config" "github.com/aws/aws-application-networking-k8s/pkg/model/core" model "github.com/aws/aws-application-networking-k8s/pkg/model/lattice" ) // helpful for testing/mocking func NewTargetGroupSynthesizer( log gwlog.Logger, cloud pkg_aws.Cloud, client client.Client, tgManager TargetGroupManager, svcExportTgBuilder gateway.SvcExportTargetGroupModelBuilder, svcBuilder gateway.LatticeServiceBuilder, stack core.Stack, ) *TargetGroupSynthesizer { return &TargetGroupSynthesizer{ log: log, cloud: cloud, client: client, targetGroupManager: tgManager, svcExportTgBuilder: svcExportTgBuilder, svcBuilder: svcBuilder, stack: stack, } } type TargetGroupSynthesizer struct { log gwlog.Logger cloud pkg_aws.Cloud client client.Client targetGroupManager TargetGroupManager stack core.Stack svcExportTgBuilder gateway.SvcExportTargetGroupModelBuilder svcBuilder gateway.LatticeServiceBuilder } func (t *TargetGroupSynthesizer) Synthesize(ctx context.Context) error { err1 := t.SynthesizeCreate(ctx) err2 := t.SynthesizeDelete(ctx) return errors.Join(err1, err2) } func (t *TargetGroupSynthesizer) SynthesizeCreate(ctx context.Context) error { var resTargetGroups []*model.TargetGroup var returnErr = false err := t.stack.ListResources(&resTargetGroups) if err != nil { return err } for _, resTargetGroup := range resTargetGroups { if resTargetGroup.IsDeleted { continue } prefix := model.TgNamePrefix(resTargetGroup.Spec) tgStatus, err := t.targetGroupManager.Upsert(ctx, resTargetGroup) if err == nil { resTargetGroup.Status = &tgStatus } else { t.log.Debugf(ctx, "Failed TargetGroupManager.Upsert %s due to %s", prefix, err) returnErr = true } } if returnErr { return fmt.Errorf("error during target group synthesis, will retry") } return nil } func (t *TargetGroupSynthesizer) SynthesizeDelete(ctx context.Context) error { var resTargetGroups []*model.TargetGroup err := t.stack.ListResources(&resTargetGroups) if err != nil { return err } var retErr error for _, resTargetGroup := range resTargetGroups { if !resTargetGroup.IsDeleted { continue } err := t.targetGroupManager.Delete(ctx, resTargetGroup) if err != nil { prefix := model.TgNamePrefix(resTargetGroup.Spec) retErr = errors.Join(retErr, fmt.Errorf("failed TargetGroupManager.Delete %s due to %s", prefix, err)) } } if retErr != nil { return retErr } return nil } // result of deletion attempt, if err is nil target group was deleted type DeleteUnusedResult struct { Arn string Err error } // This method assumes all synthesis. Returns list of deletion results, might include partial // failures if cannot produce list for deletion will return error. // // TODO: we should do parallel deletion calls, preferably with bounded WorkGroup func (t *TargetGroupSynthesizer) SynthesizeUnusedDelete(ctx context.Context) ([]DeleteUnusedResult, error) { tgsToDelete, err := t.calculateTargetGroupsToDelete(ctx) if err != nil { return nil, err } results := make([]DeleteUnusedResult, len(tgsToDelete)) for i, tg := range tgsToDelete { modelStatus := model.TargetGroupStatus{ Name: aws.StringValue(tg.tgSummary.Name), Arn: aws.StringValue(tg.tgSummary.Arn), Id: aws.StringValue(tg.tgSummary.Id), } modelTg := model.TargetGroup{ Status: &modelStatus, IsDeleted: true, } err := t.targetGroupManager.Delete(ctx, &modelTg) results[i] = DeleteUnusedResult{ Arn: modelTg.Status.Arn, Err: err, } } return results, nil } func (t *TargetGroupSynthesizer) calculateTargetGroupsToDelete(ctx context.Context) ([]tgListOutput, error) { latticeTgs, err := t.targetGroupManager.List(ctx) if err != nil { return latticeTgs, fmt.Errorf("failed TargetGroupManager.List due to %s", err) } var tgsToDelete []tgListOutput // we check existing target groups to see if they are still in use - this is necessary as // some changes to existing service exports or routes will simply create new target groups, // for example on protocol changes for _, latticeTg := range latticeTgs { if !t.hasTags(latticeTg) || !t.vpcMatchesConfig(latticeTg) { continue } // TGs from earlier releases will require 1-time manual cleanup // this method of validation only covers TGs created by this build // of the controller forward tagFields := model.TGTagFieldsFromTags(latticeTg.tags) if !t.hasExpectedTags(latticeTg, tagFields) { continue } // most importantly, is the tg in use? if len(latticeTg.tgSummary.ServiceArns) > 0 { t.log.Debugf(ctx, "TargetGroup %s (%s) is referenced by lattice service", *latticeTg.tgSummary.Arn, *latticeTg.tgSummary.Name) continue } if tagFields.K8SSourceType == model.SourceTypeSvcExport { if t.shouldDeleteSvcExportTg(ctx, latticeTg, tagFields) { tgsToDelete = append(tgsToDelete, latticeTg) } } else { if t.shouldDeleteRouteTg(ctx, latticeTg, tagFields) { tgsToDelete = append(tgsToDelete, latticeTg) } } } return tgsToDelete, nil } func (t *TargetGroupSynthesizer) shouldDeleteSvcExportTg( ctx context.Context, latticeTg tgListOutput, tagFields model.TargetGroupTagFields) bool { svcExportName := types.NamespacedName{ Namespace: tagFields.K8SServiceNamespace, Name: tagFields.K8SServiceName, } t.log.Debugf(ctx, "TargetGroup %s (%s) is referenced by ServiceExport", *latticeTg.tgSummary.Arn, *latticeTg.tgSummary.Name) svcExport := &anv1alpha1.ServiceExport{} err := t.client.Get(ctx, svcExportName, svcExport) if err != nil { if apierrors.IsNotFound(err) { // if the service export does not exist, we can safely delete t.log.Infof(ctx, "Will delete TargetGroup %s (%s) - ServiceExport is not found", *latticeTg.tgSummary.Arn, *latticeTg.tgSummary.Name) return true } else { // skip if we have an unknown error t.log.Infof(ctx, "Received unexpected API error getting service export %s", err) return false } } if !svcExport.DeletionTimestamp.IsZero() { // backing object is deleted, we can delete too t.log.Infof(ctx, "Will delete TargetGroup %s (%s) - ServiceExport has been deleted", *latticeTg.tgSummary.Arn, *latticeTg.tgSummary.Name) return true } // now we get to the tricky business of seeing if our unused target group actually matches // the current state of the service and service export - the most correct way to do this is to // reconstruct the target group spec from the service export itself, then compare fields modelTg, err := t.svcExportTgBuilder.BuildTargetGroup(ctx, svcExport) if err != nil { t.log.Infof(ctx, "Received error building svc export target group model %s", err) return false } // the main identifiers are validated, just need to check the other essentials. // protocolVersion is not in TG summary so we are bringing it from tags. if int64(modelTg.Spec.Port) != aws.Int64Value(latticeTg.tgSummary.Port) || modelTg.Spec.Protocol != aws.StringValue(latticeTg.tgSummary.Protocol) || modelTg.Spec.ProtocolVersion != tagFields.K8SProtocolVersion || modelTg.Spec.IpAddressType != aws.StringValue(latticeTg.tgSummary.IpAddressType) { // one or more immutable fields differ from the source, so the TG is out of date t.log.Infof(ctx, "Will delete TargetGroup %s (%s) - fields differ from source service/service export", *latticeTg.tgSummary.Arn, *latticeTg.tgSummary.Name) return true } t.log.Debugf(ctx, "ServiceExport TargetGroup %s (%s) is up to date", *latticeTg.tgSummary.Arn, *latticeTg.tgSummary.Name) return false } func (t *TargetGroupSynthesizer) shouldDeleteRouteTg( ctx context.Context, latticeTg tgListOutput, tagFields model.TargetGroupTagFields) bool { routeName := types.NamespacedName{ Namespace: tagFields.K8SRouteNamespace, Name: tagFields.K8SRouteName, } var err error var route core.Route if tagFields.K8SProtocolVersion == vpclattice.TargetGroupProtocolVersionGrpc { route, err = core.GetGRPCRoute(ctx, t.client, routeName) } else if *latticeTg.tgSummary.Protocol == vpclattice.TargetGroupProtocolTcp { route, err = core.GetTLSRoute(ctx, t.client, routeName) } else { route, err = core.GetHTTPRoute(ctx, t.client, routeName) } if err != nil { if apierrors.IsNotFound(err) { // if the route does not exist, we can safely delete t.log.Debugf(ctx, "Will delete TargetGroup %s (%s) - Route is not found", *latticeTg.tgSummary.Arn, *latticeTg.tgSummary.Name) return true } else { // skip if we have an unknown error t.log.Infof(ctx, "Received unexpected API error getting route %s", err) return false } } if !route.DeletionTimestamp().IsZero() { t.log.Debugf(ctx, "Will delete TargetGroup %s (%s) - Route is deleted", *latticeTg.tgSummary.Arn, *latticeTg.tgSummary.Name) return true } // basically rebuild everything for the route and see if one of the TGs matches routeStack, err := t.svcBuilder.Build(ctx, route) if err != nil { t.log.Infof(ctx, "Received error building route model %s", err) return false } var resTargetGroups []*model.TargetGroup err = routeStack.ListResources(&resTargetGroups) if err != nil { t.log.Infof(ctx, "Error listing stack target groups %s", err) return false } var matchFound bool for _, modelTg := range resTargetGroups { match, err := t.targetGroupManager.IsTargetGroupMatch(ctx, modelTg, latticeTg.tgSummary, &tagFields) if err != nil { t.log.Infof(ctx, "Received error during tg comparison %s", err) continue } if match { t.log.Debugf(ctx, "Route TargetGroup %s (%s) is up to date", *latticeTg.tgSummary.Arn, *latticeTg.tgSummary.Name) matchFound = true break } } if !matchFound { t.log.Debugf(ctx, "Will delete TargetGroup %s (%s) - TG is not up to date", *latticeTg.tgSummary.Arn, *latticeTg.tgSummary.Name) return true // safe to delete } return false } func (t *TargetGroupSynthesizer) hasTags(latticeTg tgListOutput) bool { if latticeTg.tags == nil { t.log.Debugf(context.TODO(), "Ignoring target group %s (%s) because tag fetch was not successful", *latticeTg.tgSummary.Arn, *latticeTg.tgSummary.Name) return false } return true } func (t *TargetGroupSynthesizer) vpcMatchesConfig(latticeTg tgListOutput) bool { if aws.StringValue(latticeTg.tgSummary.VpcIdentifier) != config.VpcID { t.log.Debugf(context.TODO(), "Ignoring target group %s (%s) because it is not configured for this VPC", *latticeTg.tgSummary.Arn, *latticeTg.tgSummary.Name) return false } return true } func (t *TargetGroupSynthesizer) hasExpectedTags(latticeTg tgListOutput, tagFields model.TargetGroupTagFields) bool { if tagFields.K8SClusterName != config.ClusterName { t.log.Debugf(context.TODO(), "Ignoring target group %s (%s) because it is not configured for this Cluster", *latticeTg.tgSummary.Arn, *latticeTg.tgSummary.Name) return false } if tagFields.K8SSourceType == model.SourceTypeInvalid || tagFields.K8SServiceName == "" || tagFields.K8SServiceNamespace == "" { t.log.Infof(context.TODO(), "Ignoring target group %s (%s) as one or more required tags are missing", *latticeTg.tgSummary.Arn, *latticeTg.tgSummary.Name) return false } // route-based TGs should have the additional route keys if tagFields.IsSourceTypeRoute() && (tagFields.K8SRouteName == "" || tagFields.K8SRouteNamespace == "") { t.log.Infof(context.TODO(), "Ignoring route-based target group %s (%s) as one or more required tags are missing", *latticeTg.tgSummary.Arn, *latticeTg.tgSummary.Name) return false } return true } func (t *TargetGroupSynthesizer) PostSynthesize(ctx context.Context) error { // nothing to do here return nil }