in source/code/actions/ec2_create_snapshot_action.py [0:0]
def is_completed(self, snapshot_create_data):
def grant_create_volume_permissions(snap_ids):
if self.accounts_with_create_permissions is not None and len(self.accounts_with_create_permissions) > 0:
args = {
"CreateVolumePermission": {
"Add": [{"UserId": a.strip()} for a in self.accounts_with_create_permissions]
}
}
for snapshot_id in snap_ids:
args["SnapshotId"] = snapshot_id
try:
self.ec2_client.modify_snapshot_attribute_with_retries(**args)
self._logger_.info(INFO_SETTING_CREATE_VOLUME_PERMISSIONS, ", ".join(self.accounts_with_create_permissions))
self.result["create-volume-access-accounts"] = [a.strip() for a in self.accounts_with_create_permissions]
except Exception as ex:
raise_exception(ERR_SETTING_CREATE_VOLUME_PERMISSIONS, self.accounts_with_create_permissions, ex)
def tag_shared_snapshots(snapshot_data, snap_ids):
if self.accounts_with_create_permissions not in ["", None] and self.tag_shared_snapshots:
for account in self.accounts_with_create_permissions:
session_for_tagging = self.get_action_session(account=account,
param_name=PARAM_SHARED_ACCOUNT_TAGGING_ROLENAME,
logger=self._logger_)
if session_for_tagging is None:
self._logger_.error(ERR_TAGS_NOT_SET_IN_ACCOUNT, account)
continue
try:
ec2_client = get_client_with_retries(service_name="ec2",
methods=[
"create_tags",
"delete_tags"
],
context=self._context_,
region=self._region_,
session=session_for_tagging,
logger=self._logger_)
for snap_id in snap_ids:
tags = snapshot_data.get(snap_id, {}).get("tags", None)
if tags is not None:
self._logger_.info(INFO_SET_SNAPSHOT_TAGS_SHARED, safe_json(tags, indent=3), snap_id, account,
self._region_)
tagging.set_ec2_tags(ec2_client=ec2_client,
resource_ids=[snap_id],
tags=tags,
logger=self._logger_)
except Exception as ex:
raise Exception(ERR_SETTING_SHARED_TAGS.format(account, str(ex)))
def set_volume_tags(volume_id, snap_id):
tags = self.build_tags_from_template(parameter_name=PARAM_VOLUME_TAGS,
tag_variables={
TAG_PLACEHOLDER_VOLUME_SNAPSHOT: snap_id
})
if len(tags) > 0:
try:
tagging.set_ec2_tags(ec2_client=self.ec2_client,
resource_ids=[volume_id],
tags=tags,
logger=self._logger_)
self._logger_.info(INFO_SET_VOLUME_TAGS, safe_json(tags, indent=3), volume_id)
except Exception as ex:
raise Exception(ERR_SETTING_VOLUME_TAGS.format(self.instance_id, ex))
def set_instance_tags(snap_ids):
tags = self.build_tags_from_template(parameter_name=PARAM_INSTANCE_TAGS,
tag_variables={
TAG_PLACEHOLDER_INSTANCE_SNAPSHOTS: ','.join(sorted(snap_ids))
})
if len(tags) > 0:
try:
self.set_ec2_instance_tags_with_event_loop_check(instance_ids=[self.instance_id],
tags_to_set=tags,
client=self.ec2_client,
region=self._region_)
self._logger_.info(INFO_SET_INSTANCE_TAGS, safe_json(tags, indent=3), self.instance_id)
except Exception as ex:
raise Exception(ERR_SETTING_INSTANCE_TAGS.format(self.instance_id, ex))
snapshot_ids = [volume.get("create_snapshot", {}).get("SnapshotId") for volume in
list(snapshot_create_data.get("volumes", {}).values())]
self._logger_.info(INFO_CHECKING_SNAPSHOT_STATUS, ",".join(snapshot_ids))
if len(snapshot_ids) == 0:
return {
"InstanceId": snapshot_create_data["instance"],
"Volumes": []
}
# create service instance to test is snapshots are available
ec2 = services.create_service("ec2", session=self._session_,
service_retry_strategy=get_default_retry_strategy("ec2", context=self._context_))
# test if the snapshot with the ids that were returned from the CreateSnapshot API call exists and are completed
snapshots = list(ec2.describe(services.ec2_service.SNAPSHOTS,
OwnerIds=["self"],
region=self.instance["Region"],
Filters=[
{
"Name": "snapshot-id", "Values": snapshot_ids
}
]))
if len(snapshots) != len(snapshot_ids):
# allow 5 minutes to all snapshots to appear
start_time = dateutil.parser.parse(snapshot_create_data["start-time"])
if self._datetime_.now() - start_time < timedelta(minutes=5):
self._logger_.info(INFO_NOT_ALL_IN_PROGRESS)
return None
test_result = {
"InstanceId": snapshot_create_data["instance"],
"Volumes": [{
"VolumeId": s["VolumeId"],
"SnapshotId": s["SnapshotId"],
"State": s["State"],
"Progress": s["Progress"]
} for s in snapshots]
}
self._logger_.info(INFO_STATE_SNAPSHOTS, safe_json(test_result, indent=3))
# wait until all snapshot are no longer pending
for volume in test_result["Volumes"]:
if volume["State"] == SNAPSHOT_STATE_PENDING:
self._logger_.info(INFO_CREATION_PENDING)
return None
# collect possible failed snapshots
failed = []
for volume in test_result["Volumes"]:
if volume["State"] == SNAPSHOT_STATE_ERROR:
failed.append(volume)
if len(failed) > 0:
s = ",".join([ERR_FAILED_SNAPSHOT.format(volume["SnapshotId"], volume["VolumeId"]) for volume in failed])
raise Exception(s)
if len(snapshot_ids) != len(snapshots):
created_snapshots = [s["SnapshotId"] for s in snapshots]
raise Exception(ERR_MISSING_SNAPSHOTS.format(",".join([s for s in snapshot_ids if s not in created_snapshots])))
snapshot_ids = [s["SnapshotId"] for s in snapshots]
# set tags on source instance
set_instance_tags(snapshot_ids)
for s in snapshots:
set_volume_tags(volume_id=s["VolumeId"], snap_id=s["SnapshotId"])
# set permissions to create volumes from snapshots
grant_create_volume_permissions(snapshot_ids)
# tag resources in accounts the snapshots are shared with
tag_shared_snapshots(snapshot_create_data.get("snapshots", {}), snapshot_ids)
self._logger_.info(INFO_COMPLETED)
return test_result