pkg/deploy/lattice/service_network_manager.go (219 lines of code) (raw):

package lattice import ( "context" "errors" "fmt" "golang.org/x/exp/slices" "github.com/aws/aws-application-networking-k8s/pkg/aws/services" "github.com/aws/aws-application-networking-k8s/pkg/utils/gwlog" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/vpclattice" pkg_aws "github.com/aws/aws-application-networking-k8s/pkg/aws" "github.com/aws/aws-application-networking-k8s/pkg/config" model "github.com/aws/aws-application-networking-k8s/pkg/model/lattice" "github.com/aws/aws-application-networking-k8s/pkg/utils" ) //go:generate mockgen -destination service_network_manager_mock.go -package lattice github.com/aws/aws-application-networking-k8s/pkg/deploy/lattice ServiceNetworkManager type ServiceNetworkManager interface { UpsertVpcAssociation(ctx context.Context, snName string, sgIds []*string) (string, error) DeleteVpcAssociation(ctx context.Context, snName string) error CreateOrUpdate(ctx context.Context, serviceNetwork *model.ServiceNetwork) (model.ServiceNetworkStatus, error) } func NewDefaultServiceNetworkManager(log gwlog.Logger, cloud pkg_aws.Cloud) *defaultServiceNetworkManager { return &defaultServiceNetworkManager{ log: log, cloud: cloud, } } type defaultServiceNetworkManager struct { log gwlog.Logger cloud pkg_aws.Cloud } func (m *defaultServiceNetworkManager) UpsertVpcAssociation(ctx context.Context, snName string, sgIds []*string) (string, error) { sn, err := m.cloud.Lattice().FindServiceNetwork(ctx, snName) if err != nil { return "", err } snva, err := m.getActiveVpcAssociation(ctx, *sn.SvcNetwork.Id) if err != nil { return "", err } if snva != nil { // association is active owned, err := m.cloud.TryOwn(ctx, *snva.Arn) if err != nil { return "", err } if !owned { return "", services.NewConflictError("snva", snName, fmt.Sprintf("Found existing vpc association not owned by controller: %s", *snva.Arn)) } _, err = m.updateServiceNetworkVpcAssociation(ctx, &sn.SvcNetwork, sgIds, snva.Id) if err != nil { return "", err } return *snva.Arn, nil } else { req := vpclattice.CreateServiceNetworkVpcAssociationInput{ ServiceNetworkIdentifier: sn.SvcNetwork.Id, VpcIdentifier: &config.VpcID, SecurityGroupIds: sgIds, Tags: m.cloud.DefaultTags(), } resp, err := m.cloud.Lattice().CreateServiceNetworkVpcAssociationWithContext(ctx, &req) if err != nil { return "", err } switch status := aws.StringValue(resp.Status); status { case vpclattice.ServiceNetworkVpcAssociationStatusActive: return *resp.Arn, nil default: return *resp.Arn, fmt.Errorf("%w, vpc association status in %s", RetryErr, status) } } } func (m *defaultServiceNetworkManager) DeleteVpcAssociation(ctx context.Context, snName string) error { sn, err := m.cloud.Lattice().FindServiceNetwork(ctx, snName) if err != nil { return err } snva, err := m.getActiveVpcAssociation(ctx, *sn.SvcNetwork.Id) if err != nil { return err } if snva != nil { // association is active m.log.Debugf(ctx, "Disassociating ServiceNetwork %s from VPC", snName) owned, err := m.cloud.IsArnManaged(ctx, *snva.Arn) if err != nil { // TODO check for vpclattice.ErrCodeAccessDeniedException or a new error type ErrorCodeNotFoundException // when the api no longer responds with a 404 NotFoundException instead of either of the above. // ErrorCodeNotFoundException currently not part of the golang sdk for the lattice api. This a is a distinct // error from vpclattice.ErrCodeResourceNotFoundException. // In a scenario that the vpc association is created by a foreign account, // the owner account's controller cannot read the tags of this ServiceNetworkVpcAssociation, // and AccessDeniedException is expected. m.log.Warnf(ctx, "skipping delete vpc association, association: %s, error: %s", *snva.Arn, err) return nil } if !owned { m.log.Infof(ctx, "Association %s for %s not owned by controller, skipping deletion", *snva.Arn, snName) return nil } deleteServiceNetworkVpcAssociationInput := vpclattice.DeleteServiceNetworkVpcAssociationInput{ ServiceNetworkVpcAssociationIdentifier: snva.Id, } resp, err := m.cloud.Lattice().DeleteServiceNetworkVpcAssociationWithContext(ctx, &deleteServiceNetworkVpcAssociationInput) if err != nil { m.log.Infof(ctx, "Failed to delete association %s for %s, with response %s and err %s", *snva.Arn, snName, resp, err.Error()) } return errors.New(LATTICE_RETRY) } return nil } func (m *defaultServiceNetworkManager) getActiveVpcAssociation(ctx context.Context, serviceNetworkId string) (*vpclattice.ServiceNetworkVpcAssociationSummary, error) { vpcLatticeSess := m.cloud.Lattice() associationStatusInput := vpclattice.ListServiceNetworkVpcAssociationsInput{ ServiceNetworkIdentifier: &serviceNetworkId, VpcIdentifier: &config.VpcID, } resp, err := vpcLatticeSess.ListServiceNetworkVpcAssociationsAsList(ctx, &associationStatusInput) if err != nil { return nil, err } if len(resp) == 0 { return nil, nil } // There can be at most one response for this snva := resp[0] if aws.StringValue(snva.Status) == vpclattice.ServiceNetworkVpcAssociationStatusActive { return snva, nil } m.log.Debugf(ctx, "snva %s status: %s", aws.StringValue(snva.Arn), aws.StringValue(snva.Status)) switch aws.StringValue(snva.Status) { case vpclattice.ServiceNetworkVpcAssociationStatusActive, vpclattice.ServiceNetworkVpcAssociationStatusDeleteFailed, vpclattice.ServiceNetworkVpcAssociationStatusUpdateFailed: // the resource exists return snva, nil case vpclattice.ServiceNetworkVpcAssociationStatusCreateFailed: // consider it does not exist return nil, nil default: // a mutation is in progress, try later return nil, errors.New(LATTICE_RETRY) } } // The controller does not manage service network anymore, just having to upsert SN and SNVA for default SN setup. // This function does not care about the association status, the caller is not supposed to wait for it. func (m *defaultServiceNetworkManager) CreateOrUpdate(ctx context.Context, serviceNetwork *model.ServiceNetwork) (model.ServiceNetworkStatus, error) { // check if exists foundSnSummary, err := m.cloud.Lattice().FindServiceNetwork(ctx, serviceNetwork.Spec.Name) if err != nil && !services.IsNotFoundError(err) { return model.ServiceNetworkStatus{ServiceNetworkARN: "", ServiceNetworkID: ""}, err } var serviceNetworkId string var serviceNetworkArn string vpcLatticeSess := m.cloud.Lattice() if foundSnSummary == nil { m.log.Debugf(ctx, "Creating ServiceNetwork %s and tagging it with vpcId %s", serviceNetwork.Spec.Name, config.VpcID) serviceNetworkInput := vpclattice.CreateServiceNetworkInput{ Name: &serviceNetwork.Spec.Name, Tags: m.cloud.DefaultTags(), } resp, err := vpcLatticeSess.CreateServiceNetworkWithContext(ctx, &serviceNetworkInput) if err != nil { return model.ServiceNetworkStatus{}, err } serviceNetworkId = aws.StringValue(resp.Id) serviceNetworkArn = aws.StringValue(resp.Arn) } else { m.log.Debugf(ctx, "ServiceNetwork %s exists, checking its VPC association", serviceNetwork.Spec.Name) serviceNetworkId = aws.StringValue(foundSnSummary.SvcNetwork.Id) serviceNetworkArn = aws.StringValue(foundSnSummary.SvcNetwork.Arn) snva, err := m.getActiveVpcAssociation(ctx, serviceNetworkId) if err != nil { return model.ServiceNetworkStatus{}, err } if snva != nil { m.log.Debugf(ctx, "ServiceNetwork %s already has VPC association %s", serviceNetwork.Spec.Name, aws.StringValue(snva.Arn)) return model.ServiceNetworkStatus{ServiceNetworkARN: serviceNetworkArn, ServiceNetworkID: serviceNetworkId}, nil } } m.log.Debugf(ctx, "Creating association between ServiceNetwork %s and VPC %s", serviceNetworkId, config.VpcID) createServiceNetworkVpcAssociationInput := vpclattice.CreateServiceNetworkVpcAssociationInput{ ServiceNetworkIdentifier: &serviceNetworkId, VpcIdentifier: &config.VpcID, Tags: m.cloud.DefaultTags(), } _, err = vpcLatticeSess.CreateServiceNetworkVpcAssociationWithContext(ctx, &createServiceNetworkVpcAssociationInput) if err != nil { return model.ServiceNetworkStatus{}, err } return model.ServiceNetworkStatus{ServiceNetworkARN: serviceNetworkArn, ServiceNetworkID: serviceNetworkId}, nil } func (m *defaultServiceNetworkManager) updateServiceNetworkVpcAssociation(ctx context.Context, existingSN *vpclattice.ServiceNetworkSummary, sgIds []*string, existingSnvaId *string) (model.ServiceNetworkStatus, error) { snva, err := m.cloud.Lattice().GetServiceNetworkVpcAssociationWithContext(ctx, &vpclattice.GetServiceNetworkVpcAssociationInput{ ServiceNetworkVpcAssociationIdentifier: existingSnvaId, }) if err != nil { return model.ServiceNetworkStatus{}, err } sgIdsEqual := securityGroupIdsEqual(sgIds, snva.SecurityGroupIds) if sgIdsEqual { // desiredSN's security group ids are same with snva's security group ids, don't need to update return model.ServiceNetworkStatus{ ServiceNetworkID: *existingSN.Id, ServiceNetworkARN: *existingSN.Arn, SnvaSecurityGroupIds: snva.SecurityGroupIds, }, nil } updateSnvaResp, err := m.cloud.Lattice().UpdateServiceNetworkVpcAssociationWithContext(ctx, &vpclattice.UpdateServiceNetworkVpcAssociationInput{ ServiceNetworkVpcAssociationIdentifier: existingSnvaId, SecurityGroupIds: sgIds, }) if err != nil { return model.ServiceNetworkStatus{}, err } if *updateSnvaResp.Status == vpclattice.ServiceNetworkVpcAssociationStatusActive { return model.ServiceNetworkStatus{ ServiceNetworkID: *existingSN.Id, ServiceNetworkARN: *existingSN.Arn, SnvaSecurityGroupIds: updateSnvaResp.SecurityGroupIds, }, nil } else { return model.ServiceNetworkStatus{}, fmt.Errorf("%w, update snva status: %s", RetryErr, *updateSnvaResp.Status) } } func securityGroupIdsEqual(arr1, arr2 []*string) bool { ids1 := utils.SliceMap(arr1, aws.StringValue) slices.Sort(ids1) ids2 := utils.SliceMap(arr2, aws.StringValue) slices.Sort(ids2) return slices.Equal(ids1, ids2) }