in source/code/actions/ec2_copy_snapshot_action.py [0:0]
def execute(self):
def get_tags_for_copied_snapshot():
snapshot_tags = (self.copied_volume_tagfiter.pairs_matching_any_filter(self.snapshot.get("Tags", {})))
snapshot_tags[actions.marker_snapshot_tag_source_source_volume_id()] = self.source_volume_id
snapshot_tags.update(
self.build_tags_from_template(parameter_name=PARAM_SNAPSHOT_TAGS,
region=self.source_region,
tag_variables={
TAG_PLACEHOLDER_SOURCE_SNAPSHOT_ID: self.source_snapshot_id,
TAG_PLACEHOLDER_SOURCE_REGION: self.source_region,
TAG_PLACEHOLDER_OWNER_ACCOUNT: self.owner,
TAG_PLACEHOLDER_SOURCE_VOLUME: self.source_volume_id
}))
snapshot_tags[Ec2CopySnapshotAction.marker_tag_source_snapshot_id()] = self.source_snapshot_id
snapshot_tags[actions.marker_snapshot_tag_source_source_volume_id()] = self.source_volume_id
return snapshot_tags
def get_source_snapshot():
ec2 = services.create_service("ec2", session=self._session_,
service_retry_strategy=get_default_retry_strategy("ec2", context=self._context_))
snapshot = ec2.get(services.ec2_service.SNAPSHOTS,
region=self.source_region,
RestorableByUserIds=["self"],
Filters=[{"Name": "snapshot-id", "Values": [self.source_snapshot_id]}])
return snapshot
def should_copy_snapshot():
snapshot = get_source_snapshot()
# source snapshot was already deleted by tasks that were in wait for execution list
if snapshot is None:
self.result["not-longer-available"] = True
self._logger_.info(INF_WARNING_NO_SNAPSHOT, self.source_snapshot_id)
return False
# get tags from the snapshot, these must have contain the mark_as_copied tag and this tag must contain the same
# copy serial number as the snapshot that was in the selected resource for this task instance
source_snapshot_tags = snapshot.get("Tags", {}) if snapshot is not None else {}
marked_as_copied_tag = Ec2CopySnapshotAction.marker_tag_copied_to(self._task_)
if marked_as_copied_tag in source_snapshot_tags:
snapshot_copy_data = json.loads(source_snapshot_tags[marked_as_copied_tag])
else:
snapshot_copy_data = {}
if snapshot_copy_data.get(COPY_SERIAL_NUMBER, "") != self.snapshot.get(COPY_SERIAL_NUMBER):
self._logger_.info(INF_COPIED_BY_OTHER, snapshot_copy_data.get(TAG_REGION, ""),
snapshot_copy_data(COPY_SERIAL_NUMBER, ""))
self.result["already-copied"] = True
self.result["copied-data"] = snapshot_copy_data
return False
return True
# logged information
self._logger_.info("{}, version {}", self.properties[ACTION_TITLE], self.properties[ACTION_VERSION])
self._logger_.info(INF_ACCOUNT_SNAPSHOT, self.source_snapshot_id, self._account_, self.source_region,
self._destination_region_)
self._logger_.debug("Snapshot : {}", self.snapshot)
boto_call = "copy_snapshot"
try:
# setup argument for CopySnapshot call
args = {
"SourceRegion": self.source_region,
"SourceSnapshotId": self.source_snapshot_id
}
if not should_copy_snapshot():
return self.result
if self.encrypted:
args["Encrypted"] = True
self.result["encrypted"] = True
if self.kms_key_id not in ["", None]:
args["KmsKeyId"] = self.kms_key_id
if self._dryrun_:
args["DryRun"] = True
source_description = self.snapshot.get("Description", "")
description_variables = {
TAG_PLACEHOLDER_SOURCE_SNAPSHOT_ID: self.source_snapshot_id,
TAG_PLACEHOLDER_SOURCE_REGION: self.source_region,
TAG_PLACEHOLDER_OWNER_ACCOUNT: self.owner,
TAG_PLACEHOLDER_SOURCE_VOLUME: self.source_volume_id,
TAG_PLACEHOLDER_SOURCE_DESCRIPTION: source_description
}
args["Description"] = self.build_str_from_template(parameter_name=PARAM_SNAPSHOT_DESCRIPTION,
region=self.source_region,
tag_variables=description_variables)
if args["Description"] == "":
args["Description"] = source_description
# start the copy
resp = self.ec2_destination_client.copy_snapshot_with_retries(**args)
# id of the copy
copy_snapshot_id = resp.get("SnapshotId")
self._logger_.info(INF_SNAPSHOT_COPIED, self.source_snapshot_id, self._destination_region_, copy_snapshot_id)
self.result[boto_call] = resp
self.result["copy-snapshot-id"] = copy_snapshot_id
# update the tag that marks the snapshot as being copied
boto_call = "create_tags (source)"
copied_tag_name = Ec2CopySnapshotAction.marker_tag_copied_to(self._task_)
copy_data_tag = {
copied_tag_name: safe_json(
{
TAG_REGION: self._destination_region_,
COPY_SERIAL_NUMBER: self.snapshot.get(COPY_SERIAL_NUMBER, ""),
TAG_COPIED_BY_TASK: self.get(ACTION_PARAM_TASK_ID, ""),
TAG_COPY_SNAPSHOT_ID: copy_snapshot_id
})
}
self.ec2_source_client.create_tags_with_retries(Resources=[self.source_snapshot_id],
Tags=tag_key_value_list(copy_data_tag))
# set tags on the copy
boto_call = "create_tags (target)"
tags = get_tags_for_copied_snapshot()
self._logger_.info(INF_CREATE_COPIED_TAGS, tags, self._account_)
if len(tags) > 0:
tagging.set_ec2_tags(ec2_client=self.ec2_destination_client,
resource_ids=[copy_snapshot_id],
tags=tags,
logger=self._logger_)
self.result["tags"] = tags
self._logger_.info(INF_TAGS_CREATED)
except Exception as ex:
if self._dryrun_:
self._logger_.debug(str(ex))
self.result[boto_call] = str(ex)
return self.result
else:
raise ex
self.result[METRICS_DATA] = build_action_metrics(self, CopiedSnapshots=1)
return self.result