pkg/virtualservice/resource_manager.go (311 lines of code) (raw):
package virtualservice
import (
"context"
appmesh "github.com/aws/aws-app-mesh-controller-for-k8s/apis/appmesh/v1beta2"
"github.com/aws/aws-app-mesh-controller-for-k8s/pkg/aws/services"
"github.com/aws/aws-app-mesh-controller-for-k8s/pkg/conversions"
"github.com/aws/aws-app-mesh-controller-for-k8s/pkg/k8s"
"github.com/aws/aws-app-mesh-controller-for-k8s/pkg/mesh"
"github.com/aws/aws-app-mesh-controller-for-k8s/pkg/references"
"github.com/aws/aws-app-mesh-controller-for-k8s/pkg/runtime"
"github.com/aws/aws-app-mesh-controller-for-k8s/pkg/virtualnode"
"github.com/aws/aws-app-mesh-controller-for-k8s/pkg/virtualrouter"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
appmeshsdk "github.com/aws/aws-sdk-go/service/appmesh"
"github.com/go-logr/logr"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/conversion"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
)
// ResourceManager is dedicated to manage AppMesh VirtualService resources for k8s VirtualService CRs.
type ResourceManager interface {
// Reconcile will create/update AppMesh VirtualService to match vs.spec, and update vs.status
Reconcile(ctx context.Context, vs *appmesh.VirtualService) error
// Cleanup will delete AppMesh VirtualService created for vs.
Cleanup(ctx context.Context, vs *appmesh.VirtualService) error
}
func NewDefaultResourceManager(
k8sClient client.Client,
appMeshSDK services.AppMesh,
referencesResolver references.Resolver,
accountID string,
log logr.Logger) ResourceManager {
return &defaultResourceManager{
k8sClient: k8sClient,
appMeshSDK: appMeshSDK,
referencesResolver: referencesResolver,
accountID: accountID,
log: log,
}
}
type defaultResourceManager struct {
k8sClient client.Client
appMeshSDK services.AppMesh
referencesResolver references.Resolver
accountID string
log logr.Logger
}
func (m *defaultResourceManager) Reconcile(ctx context.Context, vs *appmesh.VirtualService) error {
ms, err := m.findMeshDependency(ctx, vs)
if err != nil {
return err
}
if err := m.validateMeshDependencies(ctx, ms); err != nil {
return err
}
vnByKey, err := m.findVirtualNodeDependencies(ctx, vs)
if err != nil {
return err
}
if err := m.validateVirtualNodeDependencies(ctx, ms, vnByKey); err != nil {
return err
}
vrByKey, err := m.findVirtualRouterDependencies(ctx, vs)
if err != nil {
return err
}
if err := m.validateVirtualRouterDependencies(ctx, ms, vrByKey); err != nil {
return err
}
sdkVS, err := m.findSDKVirtualService(ctx, ms, vs)
if err != nil {
return err
}
if sdkVS == nil {
sdkVS, err = m.createSDKVirtualService(ctx, ms, vs, vnByKey, vrByKey)
if err != nil {
return err
}
} else {
sdkVS, err = m.updateSDKVirtualService(ctx, sdkVS, vs, vnByKey, vrByKey)
if err != nil {
return err
}
}
return m.updateCRDVirtualService(ctx, vs, sdkVS)
}
func (m *defaultResourceManager) Cleanup(ctx context.Context, vs *appmesh.VirtualService) error {
ms, err := m.findMeshDependency(ctx, vs)
if err != nil {
return err
}
sdkVS, err := m.findSDKVirtualService(ctx, ms, vs)
if err != nil {
if vs.Status.VirtualServiceARN == nil {
return nil
}
return err
}
if sdkVS == nil {
return nil
}
return m.deleteSDKVirtualService(ctx, sdkVS, vs)
}
// findMeshDependency find the Mesh dependency for this VirtualService.
func (m *defaultResourceManager) findMeshDependency(ctx context.Context, vs *appmesh.VirtualService) (*appmesh.Mesh, error) {
if vs.Spec.MeshRef == nil {
return nil, errors.Errorf("meshRef shouldn't be nil, please check webhook setup")
}
ms, err := m.referencesResolver.ResolveMeshReference(ctx, *vs.Spec.MeshRef)
if err != nil {
return nil, errors.Wrapf(err, "failed to resolve meshRef")
}
return ms, nil
}
// validateMeshDependencies validate the Mesh dependency for this VirtualService.
func (m *defaultResourceManager) validateMeshDependencies(ctx context.Context, ms *appmesh.Mesh) error {
if !mesh.IsMeshActive(ms) {
return runtime.NewRequeueError(errors.New("mesh is not active yet"))
}
return nil
}
func (m *defaultResourceManager) findVirtualNodeDependencies(ctx context.Context, vs *appmesh.VirtualService) (map[types.NamespacedName]*appmesh.VirtualNode, error) {
vnRefs := ExtractVirtualNodeReferences(vs)
vnByKey := make(map[types.NamespacedName]*appmesh.VirtualNode)
for _, vnRef := range vnRefs {
vnKey := references.ObjectKeyForVirtualNodeReference(vs, vnRef)
if _, ok := vnByKey[vnKey]; ok {
continue
}
vn, err := m.referencesResolver.ResolveVirtualNodeReference(ctx, vs, vnRef)
if err != nil {
return nil, errors.Wrapf(err, "failed to resolve virtualNodeRef")
}
vnByKey[vnKey] = vn
}
return vnByKey, nil
}
func (m *defaultResourceManager) validateVirtualNodeDependencies(ctx context.Context, ms *appmesh.Mesh, vnByKey map[types.NamespacedName]*appmesh.VirtualNode) error {
for _, vn := range vnByKey {
if vn.Spec.MeshRef == nil || !mesh.IsMeshReferenced(ms, *vn.Spec.MeshRef) {
return errors.Errorf("virtualNode %v didn't belong to mesh %v", k8s.NamespacedName(vn), k8s.NamespacedName(ms))
}
if !virtualnode.IsVirtualNodeActive(vn) {
return runtime.NewRequeueError(errors.New("virtualNode is not active yet"))
}
}
return nil
}
func (m *defaultResourceManager) findVirtualRouterDependencies(ctx context.Context, vs *appmesh.VirtualService) (map[types.NamespacedName]*appmesh.VirtualRouter, error) {
vrRefs := ExtractVirtualRouterReferences(vs)
vrByKey := make(map[types.NamespacedName]*appmesh.VirtualRouter)
for _, vrRef := range vrRefs {
vrKey := references.ObjectKeyForVirtualRouterReference(vs, vrRef)
if _, ok := vrByKey[vrKey]; ok {
continue
}
vr, err := m.referencesResolver.ResolveVirtualRouterReference(ctx, vs, vrRef)
if err != nil {
return nil, errors.Wrapf(err, "failed to resolve virtualRouterRef")
}
vrByKey[vrKey] = vr
}
return vrByKey, nil
}
func (m *defaultResourceManager) validateVirtualRouterDependencies(ctx context.Context, ms *appmesh.Mesh, vrByKey map[types.NamespacedName]*appmesh.VirtualRouter) error {
for _, vr := range vrByKey {
if vr.Spec.MeshRef == nil || !mesh.IsMeshReferenced(ms, *vr.Spec.MeshRef) {
return errors.Errorf("virtualRouter %v didn't belong to mesh %v", k8s.NamespacedName(vr), k8s.NamespacedName(ms))
}
if !virtualrouter.IsVirtualRouterActive(vr) {
return runtime.NewRequeueError(errors.New("virtualRouter is not active yet"))
}
}
return nil
}
func (m *defaultResourceManager) findSDKVirtualService(ctx context.Context, ms *appmesh.Mesh, vs *appmesh.VirtualService) (*appmeshsdk.VirtualServiceData, error) {
resp, err := m.appMeshSDK.DescribeVirtualServiceWithContext(ctx, &appmeshsdk.DescribeVirtualServiceInput{
MeshName: ms.Spec.AWSName,
MeshOwner: ms.Spec.MeshOwner,
VirtualServiceName: vs.Spec.AWSName,
})
if err != nil {
if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == "NotFoundException" {
return nil, nil
}
return nil, err
}
return resp.VirtualService, nil
}
func (m *defaultResourceManager) createSDKVirtualService(ctx context.Context, ms *appmesh.Mesh, vs *appmesh.VirtualService,
vnByKey map[types.NamespacedName]*appmesh.VirtualNode, vrByKey map[types.NamespacedName]*appmesh.VirtualRouter) (*appmeshsdk.VirtualServiceData, error) {
sdkVSSpec, err := BuildSDKVirtualServiceSpec(vs, vnByKey, vrByKey)
if err != nil {
return nil, err
}
resp, err := m.appMeshSDK.CreateVirtualServiceWithContext(ctx, &appmeshsdk.CreateVirtualServiceInput{
MeshName: ms.Spec.AWSName,
MeshOwner: ms.Spec.MeshOwner,
VirtualServiceName: vs.Spec.AWSName,
Spec: sdkVSSpec,
})
if err != nil {
return nil, err
}
return resp.VirtualService, nil
}
func (m *defaultResourceManager) updateSDKVirtualService(ctx context.Context, sdkVS *appmeshsdk.VirtualServiceData, vs *appmesh.VirtualService,
vnByKey map[types.NamespacedName]*appmesh.VirtualNode, vrByKey map[types.NamespacedName]*appmesh.VirtualRouter) (*appmeshsdk.VirtualServiceData, error) {
actualSDKVSSpec := sdkVS.Spec
desiredSDKVSSpec, err := BuildSDKVirtualServiceSpec(vs, vnByKey, vrByKey)
if err != nil {
return nil, err
}
opts := cmpopts.EquateEmpty()
if cmp.Equal(desiredSDKVSSpec, actualSDKVSSpec, opts) {
return sdkVS, nil
}
if !m.isSDKVirtualServiceControlledByCRDVirtualService(ctx, sdkVS, vs) {
m.log.V(1).Info("skip virtualService update since it's not controlled",
"virtualService", k8s.NamespacedName(vs),
"virtualServiceARN", aws.StringValue(sdkVS.Metadata.Arn),
)
return sdkVS, nil
}
diff := cmp.Diff(desiredSDKVSSpec, actualSDKVSSpec, opts)
m.log.V(1).Info("virtualServiceSpec changed",
"virtualService", k8s.NamespacedName(vs),
"actualSDKVRSpec", actualSDKVSSpec,
"desiredSDKVRSpec", desiredSDKVSSpec,
"diff", diff,
)
resp, err := m.appMeshSDK.UpdateVirtualServiceWithContext(ctx, &appmeshsdk.UpdateVirtualServiceInput{
MeshName: sdkVS.MeshName,
MeshOwner: sdkVS.Metadata.MeshOwner,
VirtualServiceName: sdkVS.VirtualServiceName,
Spec: desiredSDKVSSpec,
})
if err != nil {
return nil, err
}
return resp.VirtualService, nil
}
func (m *defaultResourceManager) deleteSDKVirtualService(ctx context.Context, sdkVS *appmeshsdk.VirtualServiceData, vs *appmesh.VirtualService) error {
if !m.isSDKVirtualServiceOwnedByCRDVirtualService(ctx, sdkVS, vs) {
m.log.V(1).Info("skip virtualService deletion since its not owned",
"virtualService", k8s.NamespacedName(vs),
"virtualServiceARN", aws.StringValue(sdkVS.Metadata.Arn),
)
return nil
}
_, err := m.appMeshSDK.DeleteVirtualServiceWithContext(ctx, &appmeshsdk.DeleteVirtualServiceInput{
MeshName: sdkVS.MeshName,
MeshOwner: sdkVS.Metadata.MeshOwner,
VirtualServiceName: sdkVS.VirtualServiceName,
})
if err != nil {
return err
}
return nil
}
func (m *defaultResourceManager) updateCRDVirtualService(ctx context.Context, vs *appmesh.VirtualService, sdkVS *appmeshsdk.VirtualServiceData) error {
oldVS := vs.DeepCopy()
needsUpdate := false
if aws.StringValue(vs.Status.VirtualServiceARN) != aws.StringValue(sdkVS.Metadata.Arn) {
vs.Status.VirtualServiceARN = sdkVS.Metadata.Arn
needsUpdate = true
}
if aws.Int64Value(vs.Status.ObservedGeneration) != vs.Generation {
vs.Status.ObservedGeneration = aws.Int64(vs.Generation)
needsUpdate = true
}
vsActiveConditionStatus := corev1.ConditionFalse
if sdkVS.Status != nil && aws.StringValue(sdkVS.Status.Status) == appmeshsdk.VirtualServiceStatusCodeActive {
vsActiveConditionStatus = corev1.ConditionTrue
}
if updateCondition(vs, appmesh.VirtualServiceActive, vsActiveConditionStatus, nil, nil) {
needsUpdate = true
}
if !needsUpdate {
return nil
}
return m.k8sClient.Status().Patch(ctx, vs, client.MergeFrom(oldVS))
}
// isSDKVirtualServiceControlledByCRDVirtualService checks whether an AppMesh VirtualService is controlled by CRD VirtualService.
// if it's controlled, CRD VirtualService update is responsible for updating the AppMesh VirtualService.
func (m *defaultResourceManager) isSDKVirtualServiceControlledByCRDVirtualService(ctx context.Context, sdkVS *appmeshsdk.VirtualServiceData, vs *appmesh.VirtualService) bool {
return aws.StringValue(sdkVS.Metadata.ResourceOwner) == m.accountID
}
// isSDKVirtualServiceOwnedByCRDVirtualService checks whether an AppMesh VirtualService is owned by CRD VirtualService.
// if it's owned, CRD VirtualService deletion is responsible for deleting the AppMesh VirtualService.
func (m *defaultResourceManager) isSDKVirtualServiceOwnedByCRDVirtualService(ctx context.Context, sdkVS *appmeshsdk.VirtualServiceData, vs *appmesh.VirtualService) bool {
if !m.isSDKVirtualServiceControlledByCRDVirtualService(ctx, sdkVS, vs) {
return false
}
// TODO: Adding tagging support, so a existing virtualService in owner account but not ownership can be support.
// currently, virtualService controllership == ownership, but it don't have to be so once we add tagging support.
return true
}
func BuildSDKVirtualServiceSpec(vs *appmesh.VirtualService, vnByKey map[types.NamespacedName]*appmesh.VirtualNode, vrByKey map[types.NamespacedName]*appmesh.VirtualRouter) (*appmeshsdk.VirtualServiceSpec, error) {
converter := conversion.NewConverter(conversion.DefaultNameFunc)
converter.RegisterUntypedConversionFunc((*appmesh.VirtualServiceSpec)(nil), (*appmeshsdk.VirtualServiceSpec)(nil), func(a, b interface{}, scope conversion.Scope) error {
return conversions.Convert_CRD_VirtualServiceSpec_To_SDK_VirtualServiceSpec(a.(*appmesh.VirtualServiceSpec), b.(*appmeshsdk.VirtualServiceSpec), scope)
})
sdkVNRefConvertFunc := references.BuildSDKVirtualNodeReferenceConvertFunc(vs, vnByKey)
converter.RegisterUntypedConversionFunc((*appmesh.VirtualNodeReference)(nil), (*string)(nil), func(a, b interface{}, scope conversion.Scope) error {
return sdkVNRefConvertFunc(a.(*appmesh.VirtualNodeReference), b.(*string), scope)
})
sdkVRRefConvertFunc := references.BuildSDKVirtualRouterReferenceConvertFunc(vs, vrByKey)
converter.RegisterUntypedConversionFunc((*appmesh.VirtualRouterReference)(nil), (*string)(nil), func(a, b interface{}, scope conversion.Scope) error {
return sdkVRRefConvertFunc(a.(*appmesh.VirtualRouterReference), b.(*string), scope)
})
sdkVSSpec := &appmeshsdk.VirtualServiceSpec{}
if err := converter.Convert(&vs.Spec, sdkVSSpec, nil); err != nil {
return nil, err
}
return sdkVSSpec, nil
}