pkg/deploy/lattice/service_manager.go (335 lines of code) (raw):
package lattice
import (
"context"
"fmt"
"github.com/aws/aws-application-networking-k8s/pkg/aws/services"
"github.com/aws/aws-application-networking-k8s/pkg/utils/gwlog"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/vpclattice"
pkg_aws "github.com/aws/aws-application-networking-k8s/pkg/aws"
model "github.com/aws/aws-application-networking-k8s/pkg/model/lattice"
)
//go:generate mockgen -destination service_manager_mock.go -package lattice github.com/aws/aws-application-networking-k8s/pkg/deploy/lattice ServiceManager
type Service = model.Service
type ServiceInfo = model.ServiceStatus
type CreateSvcReq = vpclattice.CreateServiceInput
type CreateSvcResp = vpclattice.CreateServiceOutput
type UpdateSvcReq = vpclattice.UpdateServiceInput
type UpdateSvcResp = vpclattice.UpdateServiceOutput
type CreateSnSvcAssocReq = vpclattice.CreateServiceNetworkServiceAssociationInput
type CreateSnSvcAssocResp = vpclattice.CreateServiceNetworkServiceAssociationOutput
type DelSnSvcAssocReq = vpclattice.DeleteServiceNetworkServiceAssociationInput
type DelSnSvcAssocResp = vpclattice.DeleteServiceNetworkServiceAssociationOutput
type GetSvcReq = vpclattice.GetServiceInput
type SvcSummary = vpclattice.ServiceSummary
type ListSnSvcAssocsReq = vpclattice.ListServiceNetworkServiceAssociationsInput
type SnSvcAssocSummary = vpclattice.ServiceNetworkServiceAssociationSummary
type ServiceManager interface {
Upsert(ctx context.Context, service *model.Service) (model.ServiceStatus, error)
Delete(ctx context.Context, service *model.Service) error
}
type defaultServiceManager struct {
log gwlog.Logger
cloud pkg_aws.Cloud
}
func NewServiceManager(log gwlog.Logger, cloud pkg_aws.Cloud) *defaultServiceManager {
return &defaultServiceManager{
log: log,
cloud: cloud,
}
}
func (m *defaultServiceManager) createServiceAndAssociate(ctx context.Context, svc *Service) (ServiceInfo, error) {
createSvcReq := m.newCreateSvcReq(svc)
createSvcResp, err := m.cloud.Lattice().CreateServiceWithContext(ctx, createSvcReq)
if err != nil {
return ServiceInfo{}, fmt.Errorf("failed CreateService %s due to %s", aws.StringValue(createSvcReq.Name), err)
}
m.log.Infof(ctx, "Success CreateService %s %s",
aws.StringValue(createSvcResp.Name), aws.StringValue(createSvcResp.Id))
for _, snName := range svc.Spec.ServiceNetworkNames {
err = m.createAssociation(ctx, createSvcResp.Id, snName)
if err != nil {
return ServiceInfo{}, err
}
}
svcInfo := svcStatusFromCreateSvcResp(createSvcResp)
return svcInfo, nil
}
func (m *defaultServiceManager) createAssociation(ctx context.Context, svcId *string, snName string) error {
snInfo, err := m.cloud.Lattice().FindServiceNetwork(ctx, snName)
if err != nil {
return err
}
assocReq := &CreateSnSvcAssocReq{
ServiceIdentifier: svcId,
ServiceNetworkIdentifier: snInfo.SvcNetwork.Id,
Tags: m.cloud.DefaultTags(),
}
assocResp, err := m.cloud.Lattice().CreateServiceNetworkServiceAssociationWithContext(ctx, assocReq)
if err != nil {
return fmt.Errorf("failed CreateServiceNetworkServiceAssociation %s %s due to %s",
aws.StringValue(assocReq.ServiceNetworkIdentifier), aws.StringValue(assocReq.ServiceIdentifier), err)
}
m.log.Infof(ctx, "Success CreateServiceNetworkServiceAssociation %s %s",
aws.StringValue(assocReq.ServiceNetworkIdentifier), aws.StringValue(assocReq.ServiceIdentifier))
err = handleCreateAssociationResp(assocResp)
if err != nil {
return err
}
return nil
}
func (m *defaultServiceManager) newCreateSvcReq(svc *Service) *CreateSvcReq {
svcName := svc.LatticeServiceName()
req := &vpclattice.CreateServiceInput{
Name: &svcName,
Tags: m.cloud.DefaultTagsMergedWith(svc.Spec.ToTags()),
}
if svc.Spec.CustomerDomainName != "" {
req.CustomDomainName = &svc.Spec.CustomerDomainName
}
if svc.Spec.CustomerCertARN != "" {
req.SetCertificateArn(svc.Spec.CustomerCertARN)
}
return req
}
func svcStatusFromCreateSvcResp(resp *CreateSvcResp) ServiceInfo {
svcInfo := ServiceInfo{}
if resp == nil {
return svcInfo
}
svcInfo.Arn = aws.StringValue(resp.Arn)
svcInfo.Id = aws.StringValue(resp.Id)
if resp.DnsEntry != nil {
svcInfo.Dns = aws.StringValue(resp.DnsEntry.DomainName)
}
return svcInfo
}
func (m *defaultServiceManager) checkAndUpdateTags(ctx context.Context, svc *Service, svcSum *SvcSummary) error {
tagsResp, err := m.cloud.Lattice().ListTagsForResourceWithContext(ctx, &vpclattice.ListTagsForResourceInput{
ResourceArn: svcSum.Arn,
})
if err != nil {
return err
}
owned, err := m.cloud.TryOwnFromTags(ctx, *svcSum.Arn, tagsResp.Tags)
if err != nil {
return err
}
if !owned {
return services.NewConflictError("service", svc.Spec.RouteNamespace+"/"+svc.Spec.RouteName,
fmt.Sprintf("Found existing resource not owned by controller: %s", *svcSum.Arn))
}
tagFields := model.ServiceTagFieldsFromTags(tagsResp.Tags)
switch {
case tagFields.RouteName == "" && tagFields.RouteNamespace == "":
// backwards compatibility: If the service has no identification tags, consider this controller has
// correct information and add tags
_, err = m.cloud.Lattice().TagResourceWithContext(ctx, &vpclattice.TagResourceInput{
ResourceArn: svcSum.Arn,
Tags: svc.Spec.ToTags(),
})
return err
case tagFields != svc.Spec.ServiceTagFields:
// Considering these scenarios:
// - two services with same namespace-name but different routeType
// - two services with conflict edge case such as my-namespace/service & my/namespace-service
return services.NewConflictError("service", svc.Spec.RouteName+"/"+svc.Spec.RouteNamespace,
fmt.Sprintf("Found existing resource with conflicting service name: %s", *svcSum.Arn))
}
return nil
}
func (m *defaultServiceManager) updateServiceAndAssociations(ctx context.Context, svc *Service, svcSum *SvcSummary) (ServiceInfo, error) {
if svc.Spec.CustomerCertARN != "" {
updReq := &UpdateSvcReq{
CertificateArn: aws.String(svc.Spec.CustomerCertARN),
ServiceIdentifier: svcSum.Id,
}
_, err := m.cloud.Lattice().UpdateService(updReq)
if err != nil {
return ServiceInfo{}, err
}
}
err := m.updateAssociations(ctx, svc, svcSum)
if err != nil {
return ServiceInfo{}, err
}
svcInfo := ServiceInfo{
Arn: aws.StringValue(svcSum.Arn),
Id: aws.StringValue(svcSum.Id),
}
if svcSum.DnsEntry != nil {
svcInfo.Dns = aws.StringValue(svcSum.DnsEntry.DomainName)
}
return svcInfo, nil
}
func (m *defaultServiceManager) getAllAssociations(ctx context.Context, svcSum *SvcSummary) ([]*SnSvcAssocSummary, error) {
assocsReq := &ListSnSvcAssocsReq{
ServiceIdentifier: svcSum.Id,
}
assocs, err := m.cloud.Lattice().ListServiceNetworkServiceAssociationsAsList(ctx, assocsReq)
if err != nil {
return nil, err
}
return assocs, err
}
// update SN-Svc associations, if svc has no SN associations will delete all of them
// does not delete associations that are not tagged by controller
func (m *defaultServiceManager) updateAssociations(ctx context.Context, svc *Service, svcSum *SvcSummary) error {
assocs, err := m.getAllAssociations(ctx, svcSum)
if err != nil {
return err
}
toCreate, toDelete, err := associationsDiff(svc, assocs)
if err != nil {
return err
}
for _, snName := range toCreate {
err := m.createAssociation(ctx, svcSum.Id, snName)
if err != nil {
return err
}
}
for _, assoc := range toDelete {
isManaged, err := m.cloud.IsArnManaged(ctx, *assoc.Arn)
if err != nil {
// TODO check for vpclattice.ErrCodeAccessDeniedException or a new error type ErrorCodeNotFoundException
// when the api no longer responds with a 404 NotFoundException instead of either of the above.
// ErrorCodeNotFoundException currently not part of the golang sdk for the lattice api. This a is a distinct
// error from vpclattice.ErrCodeResourceNotFoundException.
// In a scenario that the service association is created by a foreign account,
// the owner account's controller cannot read the tags of this ServiceNetworkServiceAssociation,
// and AccessDeniedException is expected.
m.log.Warnf(ctx, "skipping update associations service: %s, association: %s, error: %s", svc.LatticeServiceName(), *assoc.Arn, err)
continue
}
if isManaged {
err = m.deleteAssociation(ctx, assoc.Arn)
if err != nil {
return err
}
}
}
return nil
}
// returns RetryErr on all non-active Sn-Svc association responses
func handleCreateAssociationResp(resp *CreateSnSvcAssocResp) error {
status := aws.StringValue(resp.Status)
if status != vpclattice.ServiceNetworkServiceAssociationStatusActive {
return fmt.Errorf("%w: sn-service-association-id: %s, non-active status: %s",
RetryErr, aws.StringValue(resp.Id), status)
}
return nil
}
// compare current sn-svc associations with new ones,
// returns 2 slices: toCreate with SN names and toDelete with current associations
// if assoc should be created but current state is in deletion we should retry
func associationsDiff(svc *Service, curAssocs []*SnSvcAssocSummary) ([]string, []SnSvcAssocSummary, error) {
toCreate := []string{}
toDelete := []SnSvcAssocSummary{}
// create two Sets and find Difference New-Old->toCreate and Old-New->toDelete
newSet := map[string]bool{}
for _, sn := range svc.Spec.ServiceNetworkNames {
newSet[sn] = true
}
oldSet := map[string]SnSvcAssocSummary{}
for _, sn := range curAssocs {
oldSet[*sn.ServiceNetworkName] = *sn
}
for newSn := range newSet {
oldSn, ok := oldSet[newSn]
if !ok {
toCreate = append(toCreate, newSn)
}
// assoc should exists but in deletion state, will retry later to re-create
// TODO: we should have something more lightweight, retrying full reconciliation looks to heavy
if aws.StringValue(oldSn.Status) == vpclattice.ServiceNetworkServiceAssociationStatusDeleteInProgress {
return nil, nil, fmt.Errorf("%w: want to associate sn: %s to svc: %s, but status is: %s",
RetryErr, newSn, svc.LatticeServiceName(), *oldSn.Status)
}
// TODO: if assoc in failed state, may be we should try to re-create?
}
for oldSn, sn := range oldSet {
_, ok := newSet[oldSn]
if !ok {
toDelete = append(toDelete, sn)
}
}
return toCreate, toDelete, nil
}
func (m *defaultServiceManager) deleteAllAssociations(ctx context.Context, svc *SvcSummary) error {
assocs, err := m.getAllAssociations(ctx, svc)
if err != nil {
return err
}
for _, assoc := range assocs {
err = m.deleteAssociation(ctx, assoc.Arn)
if err != nil {
return err
}
}
return nil
}
func (m *defaultServiceManager) deleteAllListeners(ctx context.Context, svc *SvcSummary) error {
listeners, err := m.cloud.Lattice().ListListenersAsList(ctx, &vpclattice.ListListenersInput{
ServiceIdentifier: svc.Id,
})
if err != nil {
return err
}
for _, listener := range listeners {
_, err = m.cloud.Lattice().DeleteListenerWithContext(ctx, &vpclattice.DeleteListenerInput{
ServiceIdentifier: svc.Id,
ListenerIdentifier: listener.Id,
})
if err != nil {
return err
}
}
return nil
}
func (m *defaultServiceManager) deleteAssociation(ctx context.Context, assocArn *string) error {
delReq := &DelSnSvcAssocReq{ServiceNetworkServiceAssociationIdentifier: assocArn}
_, err := m.cloud.Lattice().DeleteServiceNetworkServiceAssociationWithContext(ctx, delReq)
if err != nil {
return fmt.Errorf("failed DeleteServiceNetworkServiceAssociation %s due to %s",
aws.StringValue(assocArn), err)
}
m.log.Infof(ctx, "Success DeleteServiceNetworkServiceAssociation %s", aws.StringValue(assocArn))
return nil
}
func (m *defaultServiceManager) deleteService(ctx context.Context, svc *SvcSummary) error {
delInput := vpclattice.DeleteServiceInput{
ServiceIdentifier: svc.Id,
}
_, err := m.cloud.Lattice().DeleteServiceWithContext(ctx, &delInput)
if err != nil {
return fmt.Errorf("failed DeleteService %s due to %s", aws.StringValue(svc.Id), err)
}
m.log.Infof(ctx, "Success DeleteService %s", *svc.Id)
return nil
}
// Create or update Service and ServiceNetwork-Service associations
func (m *defaultServiceManager) Upsert(ctx context.Context, svc *Service) (ServiceInfo, error) {
svcSum, err := m.cloud.Lattice().FindService(ctx, svc.LatticeServiceName())
if err != nil && !services.IsNotFoundError(err) {
return ServiceInfo{}, err
}
var svcInfo ServiceInfo
if svcSum == nil {
svcInfo, err = m.createServiceAndAssociate(ctx, svc)
} else {
err = m.checkAndUpdateTags(ctx, svc, svcSum)
if err != nil {
return ServiceInfo{}, err
}
svcInfo, err = m.updateServiceAndAssociations(ctx, svc, svcSum)
}
if err != nil {
return ServiceInfo{}, err
}
return svcInfo, nil
}
func (m *defaultServiceManager) Delete(ctx context.Context, svc *Service) error {
svcSum, err := m.cloud.Lattice().FindService(ctx, svc.LatticeServiceName())
if err != nil {
if services.IsNotFoundError(err) {
return nil // already deleted
} else {
return err
}
}
err = m.checkAndUpdateTags(ctx, svc, svcSum)
if err != nil {
m.log.Infof(ctx, "Service %s is either invalid or not owned. Skipping VPC Lattice resource deletion.", svc.LatticeServiceName())
return nil
}
err = m.deleteAllAssociations(ctx, svcSum)
if err != nil {
return err
}
// deleting listeners explicitly helps ensure target groups are free to delete
err = m.deleteAllListeners(ctx, svcSum)
if err != nil {
return err
}
err = m.deleteService(ctx, svcSum)
if err != nil {
return err
}
return nil
}