pkg/deploy/lattice/access_log_subscription_manager.go (197 lines of code) (raw):

package lattice import ( "context" "fmt" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/vpclattice" an_aws "github.com/aws/aws-application-networking-k8s/pkg/aws" "github.com/aws/aws-application-networking-k8s/pkg/aws/services" "github.com/aws/aws-application-networking-k8s/pkg/model/lattice" "github.com/aws/aws-application-networking-k8s/pkg/utils/gwlog" ) //go:generate mockgen -destination access_log_subscription_manager_mock.go -package lattice github.com/aws/aws-application-networking-k8s/pkg/deploy/lattice AccessLogSubscriptionManager type AccessLogSubscriptionManager interface { Create(ctx context.Context, accessLogSubscription *lattice.AccessLogSubscription) (*lattice.AccessLogSubscriptionStatus, error) Update(ctx context.Context, accessLogSubscription *lattice.AccessLogSubscription) (*lattice.AccessLogSubscriptionStatus, error) Delete(ctx context.Context, accessLogSubscriptionArn string) error } type defaultAccessLogSubscriptionManager struct { log gwlog.Logger cloud an_aws.Cloud } func NewAccessLogSubscriptionManager( log gwlog.Logger, cloud an_aws.Cloud, ) *defaultAccessLogSubscriptionManager { return &defaultAccessLogSubscriptionManager{ log: log, cloud: cloud, } } func (m *defaultAccessLogSubscriptionManager) Create( ctx context.Context, accessLogSubscription *lattice.AccessLogSubscription, ) (*lattice.AccessLogSubscriptionStatus, error) { vpcLatticeSess := m.cloud.Lattice() sourceArn, err := m.getSourceArn(ctx, accessLogSubscription.Spec.SourceType, accessLogSubscription.Spec.SourceName) if err != nil { return nil, err } tags := m.cloud.DefaultTagsMergedWith(services.Tags{ lattice.AccessLogPolicyTagKey: aws.String(accessLogSubscription.Spec.ALPNamespacedName.String()), }) createALSInput := &vpclattice.CreateAccessLogSubscriptionInput{ ResourceIdentifier: sourceArn, DestinationArn: &accessLogSubscription.Spec.DestinationArn, Tags: tags, } createALSOutput, err := vpcLatticeSess.CreateAccessLogSubscriptionWithContext(ctx, createALSInput) if err == nil { return &lattice.AccessLogSubscriptionStatus{ Arn: *createALSOutput.Arn, }, nil } switch e := err.(type) { case *vpclattice.AccessDeniedException: return nil, services.NewInvalidError(e.Message()) case *vpclattice.ResourceNotFoundException: if *e.ResourceType == "SERVICE_NETWORK" || *e.ResourceType == "SERVICE" { return nil, services.NewNotFoundError(string(accessLogSubscription.Spec.SourceType), accessLogSubscription.Spec.SourceName) } return nil, services.NewInvalidError(e.Message()) case *vpclattice.ConflictException: /* * Conflict may arise if we retry creation due to a failure elsewhere in the controller, * so we check if the conflicting ALS was created for the same ALP via its tags. * If it is the same ALP, return success. Else, return ConflictError. */ listALSInput := &vpclattice.ListAccessLogSubscriptionsInput{ ResourceIdentifier: sourceArn, } listALSOutput, err := vpcLatticeSess.ListAccessLogSubscriptionsWithContext(ctx, listALSInput) if err != nil { return nil, err } for _, als := range listALSOutput.Items { if *als.DestinationArn == accessLogSubscription.Spec.DestinationArn { listTagsInput := &vpclattice.ListTagsForResourceInput{ ResourceArn: als.Arn, } listTagsOutput, err := vpcLatticeSess.ListTagsForResourceWithContext(ctx, listTagsInput) if err != nil { return nil, err } value, exists := listTagsOutput.Tags[lattice.AccessLogPolicyTagKey] if exists && *value == accessLogSubscription.Spec.ALPNamespacedName.String() { return &lattice.AccessLogSubscriptionStatus{ Arn: *als.Arn, }, nil } } } return nil, services.NewConflictError( string(accessLogSubscription.Spec.SourceType), accessLogSubscription.Spec.SourceName, e.Message(), ) default: return nil, err } } func (m *defaultAccessLogSubscriptionManager) Update( ctx context.Context, accessLogSubscription *lattice.AccessLogSubscription, ) (*lattice.AccessLogSubscriptionStatus, error) { vpcLatticeSess := m.cloud.Lattice() // If the source is modified, we need to replace the ALS getALSInput := &vpclattice.GetAccessLogSubscriptionInput{ AccessLogSubscriptionIdentifier: aws.String(accessLogSubscription.Status.Arn), } getALSOutput, err := vpcLatticeSess.GetAccessLogSubscriptionWithContext(ctx, getALSInput) if err != nil { switch e := err.(type) { case *vpclattice.AccessDeniedException: return nil, services.NewInvalidError(e.Message()) case *vpclattice.ResourceNotFoundException: return m.Create(ctx, accessLogSubscription) default: return nil, err } } sourceArn, err := m.getSourceArn(ctx, accessLogSubscription.Spec.SourceType, accessLogSubscription.Spec.SourceName) if err != nil { return nil, err } if *getALSOutput.ResourceArn != *sourceArn { return m.replaceAccessLogSubscription(ctx, accessLogSubscription) } // Source is not modified, try to update destinationArn in the existing ALS updateALSInput := &vpclattice.UpdateAccessLogSubscriptionInput{ AccessLogSubscriptionIdentifier: aws.String(accessLogSubscription.Status.Arn), DestinationArn: aws.String(accessLogSubscription.Spec.DestinationArn), } updateALSOutput, err := vpcLatticeSess.UpdateAccessLogSubscriptionWithContext(ctx, updateALSInput) if err == nil { return &lattice.AccessLogSubscriptionStatus{ Arn: *updateALSOutput.Arn, }, nil } switch e := err.(type) { case *vpclattice.AccessDeniedException: return nil, services.NewInvalidError(e.Message()) case *vpclattice.ResourceNotFoundException: if *e.ResourceType == "SERVICE_NETWORK" || *e.ResourceType == "SERVICE" { return nil, services.NewNotFoundError(string(accessLogSubscription.Spec.SourceType), accessLogSubscription.Spec.SourceName) } return m.Create(ctx, accessLogSubscription) case *vpclattice.ConflictException: /* * A conflict can happen when the destination type of the new ALS is different from the original. * To gracefully handle this, we create a new ALS with the new destination, then delete the old one. */ return m.replaceAccessLogSubscription(ctx, accessLogSubscription) default: return nil, err } } func (m *defaultAccessLogSubscriptionManager) Delete( ctx context.Context, accessLogSubscriptionArn string, ) error { vpcLatticeSess := m.cloud.Lattice() deleteALSInput := &vpclattice.DeleteAccessLogSubscriptionInput{ AccessLogSubscriptionIdentifier: aws.String(accessLogSubscriptionArn), } _, err := vpcLatticeSess.DeleteAccessLogSubscriptionWithContext(ctx, deleteALSInput) if err != nil { if _, ok := err.(*vpclattice.ResourceNotFoundException); !ok { return err } } return nil } func (m *defaultAccessLogSubscriptionManager) getSourceArn( ctx context.Context, sourceType lattice.SourceType, sourceName string, ) (*string, error) { vpcLatticeSess := m.cloud.Lattice() switch sourceType { case lattice.ServiceNetworkSourceType: serviceNetwork, err := vpcLatticeSess.FindServiceNetwork(ctx, sourceName) if err != nil { return nil, err } return serviceNetwork.SvcNetwork.Arn, nil case lattice.ServiceSourceType: service, err := vpcLatticeSess.FindService(ctx, sourceName) if err != nil { return nil, err } return service.Arn, nil default: return nil, fmt.Errorf("unsupported source type: %s", sourceType) } } func (m *defaultAccessLogSubscriptionManager) replaceAccessLogSubscription( ctx context.Context, accessLogSubscription *lattice.AccessLogSubscription, ) (*lattice.AccessLogSubscriptionStatus, error) { newAlsStatus, err := m.Create(ctx, accessLogSubscription) if err != nil { return nil, err } err = m.Delete(ctx, accessLogSubscription.Status.Arn) if err != nil { return nil, err } return newAlsStatus, nil }