pkg/virtualnode/resource_manager.go (320 lines of code) (raw):
package virtualnode
import (
"context"
"fmt"
appmesh "github.com/aws/aws-app-mesh-controller-for-k8s/apis/appmesh/v1beta2"
"github.com/aws/aws-app-mesh-controller-for-k8s/pkg/aws/services"
"github.com/aws/aws-app-mesh-controller-for-k8s/pkg/conversions"
"github.com/aws/aws-app-mesh-controller-for-k8s/pkg/equality"
"github.com/aws/aws-app-mesh-controller-for-k8s/pkg/k8s"
"github.com/aws/aws-app-mesh-controller-for-k8s/pkg/mesh"
"github.com/aws/aws-app-mesh-controller-for-k8s/pkg/references"
"github.com/aws/aws-app-mesh-controller-for-k8s/pkg/runtime"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
appmeshsdk "github.com/aws/aws-sdk-go/service/appmesh"
"github.com/go-logr/logr"
"github.com/google/go-cmp/cmp"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/conversion"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
)
// ResourceManager is dedicated to manage AppMesh VirtualNode resources for k8s VirtualNode CRs.
type ResourceManager interface {
// Reconcile will create/update AppMesh VirtualNode to match vn.spec, and update vn.status
Reconcile(ctx context.Context, vn *appmesh.VirtualNode) error
// Cleanup will delete AppMesh VirtualNode created for vn.
Cleanup(ctx context.Context, vn *appmesh.VirtualNode) error
}
func NewDefaultResourceManager(
k8sClient client.Client,
appMeshSDK services.AppMesh,
referencesResolver references.Resolver,
accountID string,
log logr.Logger,
enableBackendGroups bool) ResourceManager {
return &defaultResourceManager{
k8sClient: k8sClient,
appMeshSDK: appMeshSDK,
referencesResolver: referencesResolver,
accountID: accountID,
log: log,
enableBackendGroups: enableBackendGroups,
}
}
// defaultResourceManager implements ResourceManager
type defaultResourceManager struct {
k8sClient client.Client
appMeshSDK services.AppMesh
referencesResolver references.Resolver
accountID string
log logr.Logger
enableBackendGroups bool
}
func (m *defaultResourceManager) Reconcile(ctx context.Context, vn *appmesh.VirtualNode) error {
ms, err := m.findMeshDependency(ctx, vn)
if err != nil {
return err
}
if err := m.validateMeshDependencies(ctx, ms); err != nil {
return err
}
vsByKey, err := m.findVirtualServiceDependencies(ctx, vn)
if err != nil {
return err
}
if err := m.validateVirtualServiceDependencies(ctx, ms, vsByKey); err != nil {
return err
}
sdkVN, err := m.findSDKVirtualNode(ctx, ms, vn)
if err != nil {
return err
}
if sdkVN == nil {
sdkVN, err = m.createSDKVirtualNode(ctx, ms, vn, vsByKey)
if err != nil {
return err
}
} else {
sdkVN, err = m.updateSDKVirtualNode(ctx, sdkVN, ms, vn, vsByKey)
if err != nil {
return err
}
}
return m.updateCRDVirtualNode(ctx, vn, sdkVN)
}
func (m *defaultResourceManager) Cleanup(ctx context.Context, vn *appmesh.VirtualNode) error {
ms, err := m.findMeshDependency(ctx, vn)
if err != nil {
return err
}
sdkVN, err := m.findSDKVirtualNode(ctx, ms, vn)
if err != nil {
if vn.Status.VirtualNodeARN == nil {
return nil
}
return err
}
if sdkVN == nil {
return nil
}
return m.deleteSDKVirtualNode(ctx, sdkVN, ms, vn)
}
// findMeshDependency find the Mesh dependency for this virtualNode.
func (m *defaultResourceManager) findMeshDependency(ctx context.Context, vn *appmesh.VirtualNode) (*appmesh.Mesh, error) {
if vn.Spec.MeshRef == nil {
return nil, errors.Errorf("meshRef shouldn't be nil, please check webhook setup")
}
ms, err := m.referencesResolver.ResolveMeshReference(ctx, *vn.Spec.MeshRef)
if err != nil {
return nil, errors.Wrapf(err, "failed to resolve meshRef")
}
return ms, nil
}
// validateMeshDependencies validate the Mesh dependency for this virtualNode.
func (m *defaultResourceManager) validateMeshDependencies(ctx context.Context, ms *appmesh.Mesh) error {
if !mesh.IsMeshActive(ms) {
return runtime.NewRequeueError(errors.New("mesh is not active yet"))
}
return nil
}
// findVirtualServiceDependencies find the VirtualService dependencies for this virtualNode.
func (m *defaultResourceManager) findVirtualServiceDependencies(ctx context.Context, vn *appmesh.VirtualNode) (map[types.NamespacedName]*appmesh.VirtualService, error) {
vsByKey := make(map[types.NamespacedName]*appmesh.VirtualService)
vsRefs := ExtractVirtualServiceReferences(vn)
if m.enableBackendGroups {
for _, backendGroupRef := range vn.Spec.BackendGroups {
// Wildcard special case
bgKey := references.ObjectKeyForBackendGroupReference(vn, backendGroupRef)
if bgKey.Name == "*" {
var listOptions client.ListOptions
listOptions.Namespace = bgKey.Namespace
vsList := &appmesh.VirtualServiceList{}
if err := m.k8sClient.List(ctx, vsList, &listOptions); err != nil {
return nil, fmt.Errorf("could not list virtualservices for namespace: %s", bgKey.Namespace)
}
for _, vs := range vsList.Items {
vsRef := appmesh.VirtualServiceReference{
Namespace: aws.String(vs.Namespace),
Name: vs.Name,
}
vsRefs = append(vsRefs, vsRef)
}
} else {
bg, err := m.referencesResolver.ResolveBackendGroupReference(ctx, vn, backendGroupRef)
if err != nil {
return nil, errors.Wrapf(err, "failed to resolve backendGroupRef")
}
vsRefs = append(vsRefs, bg.Spec.VirtualServices...)
}
}
}
for _, vsRef := range vsRefs {
vsKey := references.ObjectKeyForVirtualServiceReference(vn, vsRef)
if _, ok := vsByKey[vsKey]; ok {
continue
}
vs, err := m.referencesResolver.ResolveVirtualServiceReference(ctx, vn, vsRef)
if err != nil {
return nil, errors.Wrapf(err, "failed to resolve virtualServiceRef")
}
vsByKey[vsKey] = vs
}
return vsByKey, nil
}
// validateVirtualServiceDependencies validates the VirtualService dependencies for this virtualNode.
// AppMesh API allows to create a virtualNode with virtualService backend without the virtualService presents first.
// we will not validate whether virtualService is active or not, to allows circular dependency between virtualNode and virtualService.
func (m *defaultResourceManager) validateVirtualServiceDependencies(ctx context.Context, ms *appmesh.Mesh, vsByKey map[types.NamespacedName]*appmesh.VirtualService) error {
for _, vs := range vsByKey {
if vs.Spec.MeshRef == nil || !mesh.IsMeshReferenced(ms, *vs.Spec.MeshRef) {
return errors.Errorf("virtualService %v didn't belong to mesh %v", k8s.NamespacedName(vs), k8s.NamespacedName(ms))
}
}
return nil
}
func (m *defaultResourceManager) findSDKVirtualNode(ctx context.Context, ms *appmesh.Mesh, vn *appmesh.VirtualNode) (*appmeshsdk.VirtualNodeData, error) {
resp, err := m.appMeshSDK.DescribeVirtualNodeWithContext(ctx, &appmeshsdk.DescribeVirtualNodeInput{
MeshName: ms.Spec.AWSName,
MeshOwner: ms.Spec.MeshOwner,
VirtualNodeName: vn.Spec.AWSName,
})
if err != nil {
if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == "NotFoundException" {
return nil, nil
}
return nil, err
}
return resp.VirtualNode, nil
}
func (m *defaultResourceManager) createSDKVirtualNode(ctx context.Context, ms *appmesh.Mesh, vn *appmesh.VirtualNode, vsByKey map[types.NamespacedName]*appmesh.VirtualService) (*appmeshsdk.VirtualNodeData, error) {
sdkVNSpec, err := BuildSDKVirtualNodeSpec(vn, vsByKey)
if err != nil {
return nil, err
}
resp, err := m.appMeshSDK.CreateVirtualNodeWithContext(ctx, &appmeshsdk.CreateVirtualNodeInput{
MeshName: ms.Spec.AWSName,
MeshOwner: ms.Spec.MeshOwner,
Spec: sdkVNSpec,
VirtualNodeName: vn.Spec.AWSName,
})
if err != nil {
return nil, err
}
return resp.VirtualNode, nil
}
func (m *defaultResourceManager) updateSDKVirtualNode(ctx context.Context, sdkVN *appmeshsdk.VirtualNodeData, ms *appmesh.Mesh, vn *appmesh.VirtualNode, vsByKey map[types.NamespacedName]*appmesh.VirtualService) (*appmeshsdk.VirtualNodeData, error) {
actualSDKVNSpec := sdkVN.Spec
desiredSDKVNSpec, err := BuildSDKVirtualNodeSpec(vn, vsByKey)
if err != nil {
return nil, err
}
opts := equality.CompareOptionForVirtualNodeSpec()
if cmp.Equal(desiredSDKVNSpec, actualSDKVNSpec, opts) {
return sdkVN, nil
}
if !m.isSDKVirtualNodeControlledByCRDVirtualNode(ctx, sdkVN, vn) {
m.log.V(1).Info("skip virtualNode update since it's not controlled",
"virtualNode", k8s.NamespacedName(vn),
"virtualNodeARN", aws.StringValue(sdkVN.Metadata.Arn),
)
return sdkVN, nil
}
diff := cmp.Diff(desiredSDKVNSpec, actualSDKVNSpec, opts)
m.log.V(1).Info("virtualNodeSpec changed",
"virtualNode", k8s.NamespacedName(vn),
"actualSDKVNSpec", actualSDKVNSpec,
"desiredSDKVNSpec", desiredSDKVNSpec,
"diff", diff,
)
resp, err := m.appMeshSDK.UpdateVirtualNodeWithContext(ctx, &appmeshsdk.UpdateVirtualNodeInput{
MeshName: ms.Spec.AWSName,
MeshOwner: ms.Spec.MeshOwner,
Spec: desiredSDKVNSpec,
VirtualNodeName: sdkVN.VirtualNodeName,
})
if err != nil {
return nil, err
}
return resp.VirtualNode, nil
}
func (m *defaultResourceManager) deleteSDKVirtualNode(ctx context.Context, sdkVN *appmeshsdk.VirtualNodeData, ms *appmesh.Mesh, vn *appmesh.VirtualNode) error {
if !m.isSDKVirtualNodeOwnedByCRDVirtualNode(ctx, sdkVN, vn) {
m.log.V(1).Info("skip mesh virtualNode since its not owned",
"virtualNode", k8s.NamespacedName(vn),
"virtualNodeARN", aws.StringValue(sdkVN.Metadata.Arn),
)
return nil
}
_, err := m.appMeshSDK.DeleteVirtualNodeWithContext(ctx, &appmeshsdk.DeleteVirtualNodeInput{
MeshName: ms.Spec.AWSName,
MeshOwner: ms.Spec.MeshOwner,
VirtualNodeName: sdkVN.VirtualNodeName,
})
if err != nil {
return err
}
return nil
}
func (m *defaultResourceManager) updateCRDVirtualNode(ctx context.Context, vn *appmesh.VirtualNode, sdkVN *appmeshsdk.VirtualNodeData) error {
oldVN := vn.DeepCopy()
needsUpdate := false
if aws.StringValue(vn.Status.VirtualNodeARN) != aws.StringValue(sdkVN.Metadata.Arn) {
vn.Status.VirtualNodeARN = sdkVN.Metadata.Arn
needsUpdate = true
}
if aws.Int64Value(vn.Status.ObservedGeneration) != vn.Generation {
vn.Status.ObservedGeneration = aws.Int64(vn.Generation)
needsUpdate = true
}
vnActiveConditionStatus := corev1.ConditionFalse
if sdkVN.Status != nil && aws.StringValue(sdkVN.Status.Status) == appmeshsdk.VirtualNodeStatusCodeActive {
vnActiveConditionStatus = corev1.ConditionTrue
}
if updateCondition(vn, appmesh.VirtualNodeActive, vnActiveConditionStatus, nil, nil) {
needsUpdate = true
}
if !needsUpdate {
return nil
}
return m.k8sClient.Status().Patch(ctx, vn, client.MergeFrom(oldVN))
}
func (m *defaultResourceManager) buildSDKVirtualNodeTags(ctx context.Context, vn *appmesh.VirtualNode) []*appmeshsdk.TagRef {
// TODO, support tags
return nil
}
// isSDKVirtualNodeControlledByCRDVirtualNode checks whether an AppMesh virtualNode is controlled by CRD virtualNode
// if it's controlled, CRD virtualNode update is responsible for updating the AppMesh virtualNode.
func (m *defaultResourceManager) isSDKVirtualNodeControlledByCRDVirtualNode(ctx context.Context, sdkVN *appmeshsdk.VirtualNodeData, vn *appmesh.VirtualNode) bool {
return aws.StringValue(sdkVN.Metadata.ResourceOwner) == m.accountID
}
// isSDKVirtualNodeOwnedByCRDVirtualNode checks whether an AppMesh virtualNode is owned by CRD virtualNode.
// if it's owned, CRD virtualNode deletion is responsible for deleting the AppMesh virtualNode.
func (m *defaultResourceManager) isSDKVirtualNodeOwnedByCRDVirtualNode(ctx context.Context, sdkVN *appmeshsdk.VirtualNodeData, vn *appmesh.VirtualNode) bool {
if !m.isSDKVirtualNodeControlledByCRDVirtualNode(ctx, sdkVN, vn) {
return false
}
// TODO: Adding tagging support, so a existing virtualNode in owner account but not ownership can be support.
// currently, virtualNode controllership == ownership, but it don't have to be so once we add tagging support.
return true
}
func BuildSDKVirtualNodeSpec(vn *appmesh.VirtualNode, vsByKey map[types.NamespacedName]*appmesh.VirtualService) (*appmeshsdk.VirtualNodeSpec, error) {
converter := conversion.NewConverter(conversion.DefaultNameFunc)
converter.RegisterUntypedConversionFunc((*appmesh.VirtualNodeSpec)(nil), (*appmeshsdk.VirtualNodeSpec)(nil), func(a, b interface{}, scope conversion.Scope) error {
return conversions.Convert_CRD_VirtualNodeSpec_To_SDK_VirtualNodeSpec(a.(*appmesh.VirtualNodeSpec), b.(*appmeshsdk.VirtualNodeSpec), scope)
})
sdkVSRefConvertFunc := references.BuildSDKVirtualServiceReferenceConvertFunc(vn, vsByKey)
converter.RegisterUntypedConversionFunc((*appmesh.VirtualServiceReference)(nil), (*string)(nil), func(a, b interface{}, scope conversion.Scope) error {
return sdkVSRefConvertFunc(a.(*appmesh.VirtualServiceReference), b.(*string), scope)
})
sdkVNSpec := &appmeshsdk.VirtualNodeSpec{}
tempSpec := vn.Spec.DeepCopy()
backendMap := make(map[types.NamespacedName]bool)
for _, backend := range tempSpec.Backends {
if backend.VirtualService.VirtualServiceRef != nil {
vsKey := references.ObjectKeyForVirtualServiceReference(vn, *backend.VirtualService.VirtualServiceRef)
backendMap[vsKey] = true
}
}
for vsKey, vs := range vsByKey {
if _, ok := backendMap[vsKey]; ok {
continue
}
tempSpec.Backends = append(tempSpec.Backends, appmesh.Backend{
VirtualService: appmesh.VirtualServiceBackend{
VirtualServiceRef: &appmesh.VirtualServiceReference{
Namespace: aws.String(vs.Namespace),
Name: vs.Name,
},
},
})
}
if err := converter.Convert(tempSpec, sdkVNSpec, nil); err != nil {
return nil, err
}
return sdkVNSpec, nil
}