in Lambda/TagOnRestore/src/TagOnRestore.py [0:0]
def __get_tags_by_resource(self, resource_type, resource_arn, recovery_point_arn):
"""
Helper function to get Tags for a given resource detail
"""
logger.info(f"__get_tags_by_resource resource_type {resource_type}, " +
f"resourceArn : {resource_arn}, recovery_point_arn : {recovery_point_arn}")
resource_id = self.get_resource_id_from_arn(resource_arn)
resource_tag_list = {}
recovery_point_tag_list = {}
resource_type = resource_type.lower()
backup_client = boto3.client('backup')
if resource_type in ('elasticfilesystem', 'efs'):
# Try getting tags from AWS Backup for supported resources
# Get Tags for the recovery point
# boto3 API /services/backup.html#Backup.Client.list_tags
try:
recovery_point_tag_list = backup_client.list_tags(ResourceArn=recovery_point_arn)
if 'ResponseMetadata' in recovery_point_tag_list:
del recovery_point_tag_list['ResponseMetadata']
except botocore.exceptions.ClientError as e:
# Suppress as only EFS supported as of April 2021 -
# AccessDeniedException (Not supported)
# and InvalidParameterValueException (Missing recovery point)
logger.error(f"{e.response['Error']}")
try:
resource_tag_list = self.get_resource_tags(resource_type,resource_id,resource_arn)
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == "ResourceNotFoundException":
#Try to get the tags from the recovery point
logger.error(f"Resource not fouund. Falling back to recovery_point_arn")
if not 'Tags' in resource_tag_list or len(resource_tag_list['Tags']) == 0:
logger.info('No tags extracted from original resource. Attempting tag extraction from recovery_point_arn')
resource_id = self.get_resource_id_from_arn(recovery_point_arn)
resource_tag_list = self.get_resource_tags(resource_type,resource_id,resource_arn)
if 'ResponseMetadata' in resource_tag_list:
del resource_tag_list['ResponseMetadata']
name_exist = False
for tag_info in resource_tag_list['Tags']:
if tag_info['Key'] == 'Name':
name_exist = True
if 'ResourceId' in tag_info:
del tag_info['ResourceId']
if 'ResourceType' in tag_info:
del tag_info['ResourceType']
if not name_exist:
# Handle RDS Naming arn:aws:rds:*:*:db:{DBName}
resource_id = resource_id.split(':')[-1]
# Handle instance/{InstanceId}
resource_id = resource_id.split('/')[-1]
logger.info(f"Name Tag not Found.Adding Name Tag with ResourceId : {resource_id}")
tag_info = {'Key': 'Name', 'Value': resource_id}
resource_tag_list['Tags'].append(tag_info)
try:
if 'Tags' in recovery_point_tag_list:
# Merge the tags - From resource and Recovery point
for recovery_point_tag_key, recovery_point_tag_value in \
recovery_point_tag_list['Tags'].items():
logger.info(
f"Appending recovery point Tags : {recovery_point_tag_key},{recovery_point_tag_value} to tags : {resource_tag_list['Tags']} ")
resource_tag_list['Tags'].append(
{'Key': recovery_point_tag_key, 'Value': recovery_point_tag_value})
except Exception as e:
logger.error(f"Error merging tags : {e}")
var = traceback.format_exc()
logger.error(f"Error {var} processing Merge the tags from resource and Recovery point")
logger.info(f"__get_tags_by_resource, Resource Tag List : {resource_tag_list['Tags']}")
return resource_tag_list