in source/code/actions/ec2_copy_snapshot_action.py [0:0]
def is_completed(self, snapshot_create_data):
def delete_source_after_copy():
self._logger_.info(INF_DELETING_SNAPSHOT, self.source_snapshot_id)
self.ec2_source_client.delete_snapshot_with_retries(SnapshotId=self.source_snapshot_id)
self._logger_.info(INF_SNAPSHOT_DELETED, self.source_snapshot_id, self.source_region)
def source_tags(copy_id, source_tags_param):
snapshot_tags = {}
snapshot_tags.update(
self.build_tags_from_template(parameter_name=source_tags_param,
region=self.source_region,
tag_variables={
TAG_PLACEHOLDER_COPIED_SNAPSHOT_ID: copy_id,
TAG_PLACEHOLDER_COPIED_REGION: self._destination_region_
}))
return snapshot_tags
def set_source_snapshot_tags(copy_id):
snapshot_tags = source_tags(copy_id, PARAM_SOURCE_TAGS)
if len(snapshot_tags) == 0:
return
self._logger_.info(INF_CREATE_SOURCE_TAGS, snapshot_tags, self._account_)
if len(snapshot_tags) > 0:
tagging.set_ec2_tags(ec2_client=self.ec2_source_client,
resource_ids=[self.source_snapshot_id],
tags=snapshot_tags,
logger=self._logger_)
self._logger_.info(INF_TAGS_CREATED)
def grant_create_volume_permissions(snap_id):
if self.accounts_with_create_permissions is not None and len(self.accounts_with_create_permissions) > 0:
args = {
"CreateVolumePermission": {
"Add": [{"UserId": a.strip()} for a in self.accounts_with_create_permissions]
},
"SnapshotId": snap_id
}
try:
self.ec2_destination_client.modify_snapshot_attribute_with_retries(**args)
self._logger_.info(INF_SETTING_CREATE_VOLUME_PERMISSIONS, ", ".join(self.accounts_with_create_permissions))
except Exception as ex:
raise_exception(ERR_SETTING_CREATE_VOLUME_PERMISSIONS, self.accounts_with_create_permissions, ex)
def tag_shared_snapshots(tags, snap_id):
# creates tags for snapshots that have been shared in account the snapshots are shared with
if len(tags) == 0 or not self.tag_snapshots_in_shared_accounts:
return
if self.accounts_with_create_permissions in ["", None]:
return
for account in self.accounts_with_create_permissions:
session_for_tagging = self.get_action_session(account=account,
param_name=PARAM_DESTINATION_ACCOUNT_TAG_ROLENAME,
logger=self._logger_)
if session_for_tagging is None:
self._logger_.error(ERR_TAGS_NOT_SET_IN_ACCOUNT, account)
continue
try:
ec2_client = get_client_with_retries(service_name="ec2",
methods=[
"create_tags",
"delete_tags"
],
context=self._context_,
region=self.get(PARAM_DESTINATION_REGION),
session=session_for_tagging,
logger=self._logger_)
tagging.set_ec2_tags(ec2_client=ec2_client,
resource_ids=[snap_id],
tags=tags,
logger=self._logger_)
self._logger_.info(INF_CREATE_SHARED_TAGS, tags, account)
except Exception as ex:
raise_exception(ERR_SETTING_SHARED_TAGS, account, str(ex))
def tag_shared_source_snapshot(copy_id):
# created tags for snapshots for shared snapshots in the source account of the shares snapshots
snapshot_tags = source_tags(copy_id, PARAM_SOURCE_SHARED_BY_TAGS)
if len(snapshot_tags) == 0 or not self.tag_snapshots_in_source_account:
return
# only for snapshots that have been shared by other account
if self.owner == self.get_account_for_task():
self._logger_.debug("Account {} is owner, no tags set for snapshot {} in account of owner", self._account_,
self.source_snapshot_id)
return
session_for_tagging = self.get_action_session(account=self.owner,
param_name=PARAM_SOURCE_ACCOUNT_TAG_ROLE_NAME,
logger=self._logger_)
if session_for_tagging is None:
self._logger_.error(ERR_TAGS_NOT_SET_IN_ACCOUNT, self.owner)
return
try:
self._logger_.info(INF_CREATE_SHARED_ACCOUNT_SNAPSHOT_TAGS, snapshot_tags, self.source_snapshot_id,
self.owner)
ec2_client = get_client_with_retries(service_name="ec2",
methods=[
"create_tags",
"delete_tags"
],
context=self._context_,
region=self.source_region,
session=session_for_tagging,
logger=self._logger_)
tagging.set_ec2_tags(ec2_client=ec2_client,
resource_ids=[self.source_snapshot_id],
tags=snapshot_tags,
logger=self._logger_)
except Exception as ex:
raise_exception(ERR_SETTING_SOURCE_SHARED_TAGS, self.owner, str(ex))
if snapshot_create_data.get("already-copied", False):
self._logger_.info(INF_COMPLETE_ALREADY_COPIED, self.source_snapshot_id)
return self.result
if snapshot_create_data.get("not-longer-available", False):
self._logger_.info(INF_COMPLETED_NOT_LONGER_AVAILABLE, self.source_snapshot_id)
return self.result
# create service instance to test if snapshot exists
ec2 = services.create_service("ec2", session=self._session_,
service_retry_strategy=get_default_retry_strategy("ec2", context=self._context_))
copy_snapshot_id = snapshot_create_data["copy-snapshot-id"]
# test if the snapshot with the id that was returned from the CopySnapshot API call exists and is completed
copied_snapshot = ec2.get(services.ec2_service.SNAPSHOTS,
region=self._destination_region_,
OwnerIds=["self"],
Filters=[
{
"Name": "snapshot-id", "Values": [copy_snapshot_id]
}
])
if copied_snapshot is not None:
self._logger_.debug(INF_CHECK_COMPLETED_RESULT, copied_snapshot)
state = copied_snapshot["State"] if copied_snapshot is not None else None
if copied_snapshot is None or state == SNAPSHOT_STATE_PENDING:
self._logger_.info(INF_COPY_PENDING, copy_snapshot_id, self._destination_region_)
return None
if state == SNAPSHOT_STATE_ERROR:
copied_tag_name = Ec2CopySnapshotAction.marker_tag_copied_to(self._task_)
self.ec2_source_client.delete_tags_with_retries(Resources=[self.source_snapshot_id],
Tags=[
{
"Key": copied_tag_name
}
])
raise_exception(ERR_COPY_SNAPSHOT)
if state == SNAPSHOT_STATE_COMPLETED:
self._logger_.info(INF_COPY_COMPLETED, self.source_snapshot_id, self.source_region, copy_snapshot_id,
self._destination_region_)
grant_create_volume_permissions(copy_snapshot_id)
tag_shared_snapshots(snapshot_create_data.get("tags", {}), copy_snapshot_id)
tag_shared_source_snapshot(copy_snapshot_id)
if self.delete_after_copy:
delete_source_after_copy()
else:
set_source_snapshot_tags(copy_snapshot_id)
# wait there for 15 seconds as count the limit for max number of concurrent snapshot copies
# by the EC2 service is sometimes delayed
time.sleep(5)
return copied_snapshot
return None