in source/code/actions/ec2_copy_snapshot_action.py [0:0]
def process_and_select_resource(service, logger, resource_name, resource, context, task, task_assumed_role):
def get_snapshot_tags(client, snap_id):
try:
resp = client.describe_snapshots_with_retries(RestorableByUserIds=["self"], SnapshotIds=[snap_id])
list_of_tags = resp.get("Snapshots", [{}])[0].get("Tags", [])
return {tag["Key"].strip(): tag.get("Value", "").strip() for tag in list_of_tags}, True
except Exception as ex:
if getattr(ex, "response", {}).get("Error", {}).get("Code", "") == "InvalidSnapshot.NotFound":
return {}, False
else:
raise ex
def mark_as_being_selected_for_copy(client, snapshot):
try:
tag_name = Ec2CopySnapshotAction.marker_tag_copied_to(task[handlers.TASK_NAME])
# Serial number for copy. This is stored in the tag of the snapshot anf the stored resources in the task
# Before starting a copy there will be check if these match to avoid double copied of a snapshot
copy_serial = str(uuid.uuid4())
tag_data = {
tag_name: safe_json(
{
TAG_REGION: task.get(handlers.TASK_PARAMETERS, {}).get(PARAM_DESTINATION_REGION, ""),
COPY_SERIAL_NUMBER: copy_serial,
TAG_COPY_SNAPSHOT_ID: ""
})
}
client.create_tags_with_retries(Resources=[snapshot["SnapshotId"]],
Tags=tag_key_value_list(tag_data))
# store the copy serial number as part of the selected resource
resource[COPY_SERIAL_NUMBER] = copy_serial
except Exception as ex:
logger.warning(WARN_SETTING_COPIED_TAG, snapshot["SnapshotId"], ex)
# source snapshot
snapshot_id = resource["SnapshotId"]
# owner of the snapshot
snapshot_owner = resource["OwnerId"]
parameters = task.get(handlers.TASK_PARAMETERS, {})
# copy owned, shared or both
copied_snapshot_types = parameters[PARAM_COPIED_SNAPSHOTS]
this_account = task.get(TASK_THIS_ACCOUNT, False)
accounts = task.get(TASK_ACCOUNTS, [])
if this_account and len(accounts) == 0:
account = os.getenv(handlers.ENV_OPS_AUTOMATOR_ACCOUNT)
elif not this_account and len(accounts) == 1:
account = accounts[0]
else:
account = services.account_from_role_arn(task_assumed_role)
if copied_snapshot_types == COPIED_OWNED_BY_ACCOUNT and account != snapshot_owner:
logger.debug(DEBUG_ONLY_COPY_OWNED_SNAPSHOTS, snapshot_id, snapshot_owner, PARAM_COPIED_SNAPSHOTS, account)
return None
if copied_snapshot_types == COPIED_SNAPSHOTS_SHARED_TO_ACCOUNT and account == snapshot_owner:
logger.debug(DEBUG_ONLY_COPY_SHARED_SNAPSHOTS, snapshot_id, snapshot_owner, PARAM_COPIED_SNAPSHOTS, account)
return None
copy_from_accounts = parameters.get(PARAM_COPY_FROM_OWNER_ACCOUNTS, None)
if copy_from_accounts not in [None, []]:
if copied_snapshot_types == COPIED_OWNED_BY_ACCOUNT:
raise_value_error(ERR_ACCOUNTS_BUT_NOT_SHARED, PARAM_COPY_FROM_OWNER_ACCOUNTS, PARAM_COPIED_SNAPSHOTS)
if snapshot_owner != account and snapshot_owner not in [a.strip() for a in copy_from_accounts]:
logger.debug(DEBUG_SHARED_SNAPSHOT_OWNER_NOT_IN_LIST, snapshot_id, snapshot_owner, ",".join(copy_from_accounts))
return None
# name of tag that is used to mark snapshots being copied
copied_tag_name = Ec2CopySnapshotAction.marker_tag_copied_to(task[handlers.TASK_NAME])
if copied_tag_name in resource.get("Tags", {}):
# noinspection PyBroadException
try:
logger.debug("Snapshot already copied or being copied, copy data is:\n {}",
safe_json(json.loads(resource.get("Tags", {}).get(copied_tag_name, {}))))
except Exception:
pass
return None
# ec2 client for getting most current tag values and setting tags
ec2 = get_client_with_retries(service_name="ec2", methods=["create_tags", "describe_snapshots"],
region=resource["Region"],
context=context,
session=service.session,
logger=logger)
# get the most current tags as they might be changed by overlapping copy tasks
tags, snapshot_found = get_snapshot_tags(ec2, snapshot_id)
# snapshot no longer there
if not snapshot_found:
logger.debug("Snapshot {} not longer available", snapshot_id)
return None
if copied_tag_name in tags:
return None
mark_as_being_selected_for_copy(ec2, resource)
return resource