pkg/cloudmap/resource_manager.go (425 lines of code) (raw):
package cloudmap
import (
"context"
"fmt"
"github.com/aws/aws-app-mesh-controller-for-k8s/pkg/references"
awssdk "github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/servicediscovery"
"github.com/go-logr/logr"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/cache"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/retry"
"sigs.k8s.io/controller-runtime/pkg/client"
"time"
appmesh "github.com/aws/aws-app-mesh-controller-for-k8s/apis/appmesh/v1beta2"
services "github.com/aws/aws-app-mesh-controller-for-k8s/pkg/aws/services"
)
const (
defaultServiceDNSConfigTTL = 300
defaultServiceCustomHCFailureThreshold = 1
defaultNamespaceCacheMaxSize = 100
defaultNamespaceCacheTTL = 2 * time.Minute
defaultServiceCacheMaxSize = 1024
defaultServiceCacheTTL = 2 * time.Minute
nodeRegionLabelKey1 = "failure-domain.beta.kubernetes.io/region"
nodeRegionLabelKey2 = "topology.kubernetes.io/region"
nodeAvailabilityZoneLabelKey1 = "failure-domain.beta.kubernetes.io/zone"
nodeAvailabilityZoneLabelKey2 = "topology.kubernetes.io/zone"
cloudMapServiceAnnotation = "cloudMapServiceARN"
)
type ResourceManager interface {
// Reconcile will create/update AppMesh CloudMap Resources
Reconcile(ctx context.Context, vn *appmesh.VirtualNode) error
// Cleanup will delete AppMesh CloudMap resources created for VirtualNode.
Cleanup(ctx context.Context, vn *appmesh.VirtualNode) error
}
func NewDefaultResourceManager(
k8sClient client.Client,
cloudMapSDK services.CloudMap,
referencesResolver references.Resolver,
virtualNodeEndpointResolver VirtualNodeEndpointResolver,
instancesReconciler InstancesReconciler,
enableCustomHealthCheck bool,
log logr.Logger,
cfg Config,
ipFamily string) ResourceManager {
return &defaultResourceManager{
config: cfg,
k8sClient: k8sClient,
cloudMapSDK: cloudMapSDK,
referencesResolver: referencesResolver,
virtualNodeEndpointResolver: virtualNodeEndpointResolver,
instancesReconciler: instancesReconciler,
enableCustomHealthCheck: enableCustomHealthCheck,
namespaceSummaryCache: cache.NewLRUExpireCache(defaultNamespaceCacheMaxSize),
serviceSummaryCache: cache.NewLRUExpireCache(defaultServiceCacheMaxSize),
log: log,
ipFamily: ipFamily,
}
}
// defaultResourceManager implements ResourceManager
type defaultResourceManager struct {
config Config
k8sClient client.Client
cloudMapSDK services.CloudMap
referencesResolver references.Resolver
virtualNodeEndpointResolver VirtualNodeEndpointResolver
instancesReconciler InstancesReconciler
enableCustomHealthCheck bool
namespaceSummaryCache *cache.LRUExpireCache
serviceSummaryCache *cache.LRUExpireCache
log logr.Logger
ipFamily string
}
func (m *defaultResourceManager) Reconcile(ctx context.Context, vn *appmesh.VirtualNode) error {
ms, err := m.findMeshDependency(ctx, vn)
if err != nil {
return err
}
cloudMapConfig := vn.Spec.ServiceDiscovery.AWSCloudMap
nsSummary, err := m.findCloudMapNamespace(ctx, cloudMapConfig.NamespaceName)
if err != nil {
return err
}
if nsSummary == nil {
return fmt.Errorf("cloudMap namespace not found: %v", cloudMapConfig.NamespaceName)
}
svcSummary, err := m.findCloudMapService(ctx, nsSummary, cloudMapConfig.ServiceName)
if err != nil {
return err
}
if svcSummary == nil {
svcSummary, err = m.createCloudMapService(ctx, vn, nsSummary, cloudMapConfig.ServiceName, m.config.CloudMapServiceTTL)
if err != nil {
return err
}
}
if err := m.updateCRDVirtualNode(ctx, vn, svcSummary); err != nil {
return err
}
var readyPods []*corev1.Pod
var notReadyPods []*corev1.Pod
if vn.Spec.PodSelector != nil {
readyPods, notReadyPods, _, err = m.virtualNodeEndpointResolver.Resolve(ctx, vn)
if err != nil {
return err
}
m.log.V(1).Info("resolved VirtualNode endpoints",
"readyPods", len(readyPods),
"notReadyPods", len(notReadyPods),
)
} else {
m.log.V(1).Info("VirtualNode does not have a pod selector, no endpoints")
}
nodeInfoByName := m.getClusterNodeInfo(ctx)
if err := m.instancesReconciler.Reconcile(ctx, ms, vn, *svcSummary, readyPods, notReadyPods, nodeInfoByName); err != nil {
return err
}
return nil
}
func (m *defaultResourceManager) Cleanup(ctx context.Context, vn *appmesh.VirtualNode) error {
ms, err := m.findMeshDependency(ctx, vn)
if err != nil {
return err
}
cloudMapConfig := vn.Spec.ServiceDiscovery.AWSCloudMap
nsSummary, err := m.findCloudMapNamespace(ctx, cloudMapConfig.NamespaceName)
if err != nil {
if !m.isCloudMapServiceCreated(ctx, vn) {
return nil
}
return err
}
if nsSummary == nil {
return nil
}
svcSummary, err := m.findCloudMapService(ctx, nsSummary, cloudMapConfig.ServiceName)
if err != nil {
return err
}
if svcSummary == nil {
return nil
}
if err := m.instancesReconciler.Reconcile(ctx, ms, vn, *svcSummary, nil, nil, nil); err != nil {
return err
}
if err := m.deleteCloudMapService(ctx, vn, nsSummary, svcSummary); err != nil {
return err
}
return nil
}
// findMeshDependency find the Mesh dependency for this virtualNode.
func (m *defaultResourceManager) findMeshDependency(ctx context.Context, vn *appmesh.VirtualNode) (*appmesh.Mesh, error) {
if vn.Spec.MeshRef == nil {
return nil, errors.Errorf("meshRef shouldn't be nil, please check webhook setup")
}
ms, err := m.referencesResolver.ResolveMeshReference(ctx, *vn.Spec.MeshRef)
if err != nil {
return nil, errors.Wrapf(err, "failed to resolve meshRef")
}
return ms, nil
}
// findCloudMapNamespaceFromAWS will try to find CloudMapNamespace from cache and AWS(if cache miss). returns nil if not found
func (m *defaultResourceManager) findCloudMapNamespace(ctx context.Context, namespaceName string) (*servicediscovery.NamespaceSummary, error) {
if cachedValue, exists := m.namespaceSummaryCache.Get(namespaceName); exists {
cacheItem := cachedValue.(*servicediscovery.NamespaceSummary)
return cacheItem, nil
}
nsSummary, err := m.findCloudMapNamespaceFromAWS(ctx, namespaceName)
if err != nil {
return nil, err
}
if nsSummary != nil {
m.namespaceSummaryCache.Add(namespaceName, nsSummary, defaultNamespaceCacheTTL)
}
return nsSummary, nil
}
// findCloudMapNamespaceFromAWS will try to find CloudMapNamespace directly from AWS. returns nil if not found
func (m *defaultResourceManager) findCloudMapNamespaceFromAWS(ctx context.Context, namespaceName string) (*servicediscovery.NamespaceSummary, error) {
listNamespacesInput := &servicediscovery.ListNamespacesInput{}
var nsSummary *servicediscovery.NamespaceSummary
if err := m.cloudMapSDK.ListNamespacesPagesWithContext(ctx, listNamespacesInput,
func(listNamespacesOutput *servicediscovery.ListNamespacesOutput, lastPage bool) bool {
for _, ns := range listNamespacesOutput.Namespaces {
if awssdk.StringValue(ns.Name) == namespaceName {
nsSummary = ns
return false
}
}
return true
},
); err != nil {
return nil, err
}
return nsSummary, nil
}
func (m *defaultResourceManager) findCloudMapService(ctx context.Context, nsSummary *servicediscovery.NamespaceSummary, serviceName string) (*serviceSummary, error) {
cacheKey := m.buildCloudMapServiceSummaryCacheKey(nsSummary, serviceName)
if cachedValue, exists := m.serviceSummaryCache.Get(cacheKey); exists {
cacheItem := cachedValue.(*serviceSummary)
return cacheItem, nil
}
sdkSVCSummary, err := m.findCloudMapServiceFromAWS(ctx, nsSummary, serviceName)
if err != nil {
return nil, err
}
if sdkSVCSummary != nil {
svcSummary := &serviceSummary{
serviceID: awssdk.StringValue(sdkSVCSummary.Id),
serviceARN: sdkSVCSummary.Arn,
healthCheckCustomConfig: sdkSVCSummary.HealthCheckCustomConfig,
}
m.serviceSummaryCache.Add(cacheKey, svcSummary, defaultServiceCacheTTL)
return svcSummary, nil
}
return nil, nil
}
func (m *defaultResourceManager) findCloudMapServiceFromAWS(ctx context.Context, nsSummary *servicediscovery.NamespaceSummary, serviceName string) (*servicediscovery.ServiceSummary, error) {
listServicesInput := &servicediscovery.ListServicesInput{
Filters: []*servicediscovery.ServiceFilter{
{
Name: awssdk.String(servicediscovery.ServiceFilterNameNamespaceId),
Values: []*string{nsSummary.Id},
},
},
}
var sdkSVCSummary *servicediscovery.ServiceSummary
if err := m.cloudMapSDK.ListServicesPagesWithContext(ctx, listServicesInput,
func(listServicesOutput *servicediscovery.ListServicesOutput, lastPage bool) bool {
for _, svc := range listServicesOutput.Services {
if awssdk.StringValue(svc.Name) == serviceName {
sdkSVCSummary = svc
return false
}
}
return true
},
); err != nil {
return nil, err
}
return sdkSVCSummary, nil
}
func (m *defaultResourceManager) createCloudMapService(ctx context.Context, vn *appmesh.VirtualNode, nsSummary *servicediscovery.NamespaceSummary, serviceName string,
cloudMapTTL int64) (*serviceSummary, error) {
switch awssdk.StringValue(nsSummary.Type) {
case servicediscovery.NamespaceTypeDnsPrivate:
sdkService, err := m.createCloudMapServiceUnderPrivateDNSNamespace(ctx, vn, nsSummary, serviceName, cloudMapTTL)
if err != nil {
return nil, err
}
return m.addCloudMapServiceToServiceSummaryCache(nsSummary, sdkService), nil
case servicediscovery.NamespaceTypeHttp:
sdkService, err := m.createCloudMapServiceUnderHTTPNamespace(ctx, vn, nsSummary, serviceName)
if err != nil {
return nil, err
}
return m.addCloudMapServiceToServiceSummaryCache(nsSummary, sdkService), nil
default:
return nil, errors.Errorf("unsupported namespace type: %v, use namespace with types %v instead",
awssdk.StringValue(nsSummary.Type),
[]string{servicediscovery.NamespaceTypeDnsPrivate, servicediscovery.NamespaceTypeHttp},
)
}
}
func (m *defaultResourceManager) deleteCloudMapService(ctx context.Context, vn *appmesh.VirtualNode, nsSummary *servicediscovery.NamespaceSummary, svcSummary *serviceSummary) error {
getServiceInput := &servicediscovery.GetServiceInput{Id: awssdk.String(svcSummary.serviceID)}
getServiceOutput, err := m.cloudMapSDK.GetServiceWithContext(ctx, getServiceInput)
if err != nil {
return errors.Wrapf(err, "failed to get cloudMap service")
}
if !m.isCloudMapServiceOwnedByVirtualNode(ctx, getServiceOutput.Service, vn) {
m.log.V(1).Info("skip cloudMap service deletion since it's not owned",
"namespaceName", awssdk.StringValue(nsSummary.Name),
"namespaceID", awssdk.StringValue(nsSummary.Id),
"serviceName", awssdk.StringValue(getServiceOutput.Service.Name),
"serviceID", awssdk.StringValue(getServiceOutput.Service.Id),
)
return nil
}
deleteServiceInput := &servicediscovery.DeleteServiceInput{
Id: awssdk.String(svcSummary.serviceID),
}
deleteServiceBackoff := wait.Backoff{
Steps: 4,
Duration: 15 * time.Second,
Factor: 1.0,
Jitter: 0.1,
Cap: 60 * time.Second,
}
// Delete Service. Ideally we should delete it if there are no registered instances but the call will
// fail if that is the case and we move on. Saves us an additional GET to check the instance count.
if err := retry.OnError(deleteServiceBackoff, func(err error) bool {
if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == servicediscovery.ErrCodeResourceInUse {
return true
}
return false
}, func() error {
_, err := m.cloudMapSDK.DeleteServiceWithContext(ctx, deleteServiceInput)
return err
}); err != nil {
return err
}
m.removeCloudMapServiceFromServiceSummaryCache(nsSummary, getServiceOutput.Service)
return nil
}
func (m *defaultResourceManager) createCloudMapServiceUnderPrivateDNSNamespace(ctx context.Context, vn *appmesh.VirtualNode,
nsSummary *servicediscovery.NamespaceSummary, serviceName string, cloudMapTTL int64) (*servicediscovery.Service, error) {
creatorRequestID := string(vn.UID)
var dnsRecordType string
if m.ipFamily == IPv6 {
dnsRecordType = servicediscovery.RecordTypeAaaa
} else {
dnsRecordType = servicediscovery.RecordTypeA
}
createServiceInput := &servicediscovery.CreateServiceInput{
CreatorRequestId: awssdk.String(creatorRequestID),
NamespaceId: nsSummary.Id,
Name: awssdk.String(serviceName),
DnsConfig: &servicediscovery.DnsConfig{
RoutingPolicy: awssdk.String(servicediscovery.RoutingPolicyMultivalue),
DnsRecords: []*servicediscovery.DnsRecord{
{
Type: awssdk.String(dnsRecordType),
TTL: awssdk.Int64(cloudMapTTL),
},
},
},
}
if m.enableCustomHealthCheck {
createServiceInput.HealthCheckCustomConfig = &servicediscovery.HealthCheckCustomConfig{
FailureThreshold: awssdk.Int64(defaultServiceCustomHCFailureThreshold),
}
}
resp, err := m.cloudMapSDK.CreateServiceWithContext(ctx, createServiceInput)
if err != nil {
return nil, err
}
return resp.Service, nil
}
func (m *defaultResourceManager) createCloudMapServiceUnderHTTPNamespace(ctx context.Context, vn *appmesh.VirtualNode,
nsSummary *servicediscovery.NamespaceSummary, serviceName string) (*servicediscovery.Service, error) {
creatorRequestID := string(vn.UID)
createServiceInput := &servicediscovery.CreateServiceInput{
CreatorRequestId: awssdk.String(creatorRequestID),
NamespaceId: nsSummary.Id,
Name: awssdk.String(serviceName),
}
if m.enableCustomHealthCheck {
createServiceInput.HealthCheckCustomConfig = &servicediscovery.HealthCheckCustomConfig{
FailureThreshold: awssdk.Int64(defaultServiceCustomHCFailureThreshold),
}
}
resp, err := m.cloudMapSDK.CreateServiceWithContext(ctx, createServiceInput)
if err != nil {
return nil, err
}
return resp.Service, nil
}
func (m *defaultResourceManager) addCloudMapServiceToServiceSummaryCache(nsSummary *servicediscovery.NamespaceSummary, service *servicediscovery.Service) *serviceSummary {
cacheKey := m.buildCloudMapServiceSummaryCacheKey(nsSummary, awssdk.StringValue(service.Name))
svcSummary := &serviceSummary{
serviceID: awssdk.StringValue(service.Id),
serviceARN: service.Arn,
healthCheckCustomConfig: service.HealthCheckCustomConfig,
}
m.serviceSummaryCache.Add(cacheKey, svcSummary, defaultServiceCacheTTL)
return svcSummary
}
func (m *defaultResourceManager) removeCloudMapServiceFromServiceSummaryCache(nsSummary *servicediscovery.NamespaceSummary, service *servicediscovery.Service) {
cacheKey := m.buildCloudMapServiceSummaryCacheKey(nsSummary, awssdk.StringValue(service.Name))
m.serviceSummaryCache.Remove(cacheKey)
}
// isCloudMapServiceOwnedByVirtualNode checks whether an CloudMap service is owned by VirtualNode.
// if it's owned, VirtualNode deletion is responsible for deleting the CloudMap Service
func (m *defaultResourceManager) isCloudMapServiceOwnedByVirtualNode(ctx context.Context, svc *servicediscovery.Service, vn *appmesh.VirtualNode) bool {
return awssdk.StringValue(svc.CreatorRequestId) == string(vn.UID)
}
func (m *defaultResourceManager) buildCloudMapServiceSummaryCacheKey(nsSummary *servicediscovery.NamespaceSummary, serviceName string) string {
return fmt.Sprintf("%s/%s", awssdk.StringValue(nsSummary.Id), serviceName)
}
func (m *defaultResourceManager) getClusterNodeInfo(ctx context.Context) map[string]nodeAttributes {
nodeList := &corev1.NodeList{}
if err := m.k8sClient.List(ctx, nodeList); err != nil {
return nil
}
m.log.V(1).Info("Listed Nodes", "count", len(nodeList.Items))
nodeInfoByName := make(map[string]nodeAttributes, len(nodeList.Items))
for i := range nodeList.Items {
var nodeRegion string
var nodeAvailabilityZone string
node := nodeList.Items[i]
for label, value := range node.Labels {
if label == nodeRegionLabelKey1 || label == nodeRegionLabelKey2 {
nodeRegion = value
} else if label == nodeAvailabilityZoneLabelKey1 || label == nodeAvailabilityZoneLabelKey2 {
nodeAvailabilityZone = value
}
}
nodeAttrs := nodeAttributes{
region: nodeRegion,
availabilityZone: nodeAvailabilityZone,
}
nodeInfoByName[node.Name] = nodeAttrs
}
return nodeInfoByName
}
func (m *defaultResourceManager) isCloudMapServiceCreated(ctx context.Context, vn *appmesh.VirtualNode) bool {
oldVN := vn.DeepCopy()
vnAnnotations := oldVN.Annotations
for key, _ := range vnAnnotations {
if key == cloudMapServiceAnnotation {
return true
}
}
return false
}
func (m *defaultResourceManager) updateCRDVirtualNode(ctx context.Context, vn *appmesh.VirtualNode, svcSummary *serviceSummary) error {
if svcSummary.serviceARN == nil {
return nil
}
oldVN := vn.DeepCopy()
vnAnnotations := oldVN.Annotations
if vn.Annotations == nil {
vn.Annotations = make(map[string]string)
}
for key, _ := range vnAnnotations {
if key == cloudMapServiceAnnotation {
return nil
}
}
vn.Annotations[cloudMapServiceAnnotation] = *svcSummary.serviceARN
return m.k8sClient.Patch(ctx, vn, client.MergeFrom(oldVN))
}