def is_completed()

in source/code/actions/ec2_copy_snapshot_action.py [0:0]


    def is_completed(self, snapshot_create_data):
        def delete_source_after_copy():
            self._logger_.info(INF_DELETING_SNAPSHOT, self.source_snapshot_id)
            self.ec2_source_client.delete_snapshot_with_retries(SnapshotId=self.source_snapshot_id)
            self._logger_.info(INF_SNAPSHOT_DELETED, self.source_snapshot_id, self.source_region)

        def source_tags(copy_id, source_tags_param):
            snapshot_tags = {}
            snapshot_tags.update(
                self.build_tags_from_template(parameter_name=source_tags_param,
                                              region=self.source_region,
                                              tag_variables={
                                                  TAG_PLACEHOLDER_COPIED_SNAPSHOT_ID: copy_id,
                                                  TAG_PLACEHOLDER_COPIED_REGION: self._destination_region_
                                              }))
            return snapshot_tags

        def set_source_snapshot_tags(copy_id):
            snapshot_tags = source_tags(copy_id, PARAM_SOURCE_TAGS)
            if len(snapshot_tags) == 0:
                return

            self._logger_.info(INF_CREATE_SOURCE_TAGS, snapshot_tags, self._account_)

            if len(snapshot_tags) > 0:
                tagging.set_ec2_tags(ec2_client=self.ec2_source_client,
                                     resource_ids=[self.source_snapshot_id],
                                     tags=snapshot_tags,
                                     logger=self._logger_)

                self._logger_.info(INF_TAGS_CREATED)

        def grant_create_volume_permissions(snap_id):

            if self.accounts_with_create_permissions is not None and len(self.accounts_with_create_permissions) > 0:

                args = {

                    "CreateVolumePermission": {
                        "Add": [{"UserId": a.strip()} for a in self.accounts_with_create_permissions]

                    },
                    "SnapshotId": snap_id
                }

                try:
                    self.ec2_destination_client.modify_snapshot_attribute_with_retries(**args)
                    self._logger_.info(INF_SETTING_CREATE_VOLUME_PERMISSIONS, ", ".join(self.accounts_with_create_permissions))
                except Exception as ex:
                    raise_exception(ERR_SETTING_CREATE_VOLUME_PERMISSIONS, self.accounts_with_create_permissions, ex)

        def tag_shared_snapshots(tags, snap_id):
            # creates tags for snapshots that have been shared in account the snapshots are shared with

            if len(tags) == 0 or not self.tag_snapshots_in_shared_accounts:
                return

            if self.accounts_with_create_permissions in ["", None]:
                return

            for account in self.accounts_with_create_permissions:

                session_for_tagging = self.get_action_session(account=account,
                                                              param_name=PARAM_DESTINATION_ACCOUNT_TAG_ROLENAME,
                                                              logger=self._logger_)

                if session_for_tagging is None:
                    self._logger_.error(ERR_TAGS_NOT_SET_IN_ACCOUNT, account)
                    continue

                try:
                    ec2_client = get_client_with_retries(service_name="ec2",
                                                         methods=[
                                                             "create_tags",
                                                             "delete_tags"
                                                         ],
                                                         context=self._context_,
                                                         region=self.get(PARAM_DESTINATION_REGION),
                                                         session=session_for_tagging,
                                                         logger=self._logger_)

                    tagging.set_ec2_tags(ec2_client=ec2_client,
                                         resource_ids=[snap_id],
                                         tags=tags,
                                         logger=self._logger_)

                    self._logger_.info(INF_CREATE_SHARED_TAGS, tags, account)

                except Exception as ex:
                    raise_exception(ERR_SETTING_SHARED_TAGS, account, str(ex))

        def tag_shared_source_snapshot(copy_id):
            # created tags for snapshots for shared snapshots in the source account of the shares snapshots
            snapshot_tags = source_tags(copy_id, PARAM_SOURCE_SHARED_BY_TAGS)
            if len(snapshot_tags) == 0 or not self.tag_snapshots_in_source_account:
                return

            # only for snapshots that have been shared by other account
            if self.owner == self.get_account_for_task():
                self._logger_.debug("Account {} is owner, no tags set for snapshot {} in account of owner", self._account_,
                                    self.source_snapshot_id)
                return

            session_for_tagging = self.get_action_session(account=self.owner,
                                                          param_name=PARAM_SOURCE_ACCOUNT_TAG_ROLE_NAME,
                                                          logger=self._logger_)

            if session_for_tagging is None:
                self._logger_.error(ERR_TAGS_NOT_SET_IN_ACCOUNT, self.owner)
                return

            try:

                self._logger_.info(INF_CREATE_SHARED_ACCOUNT_SNAPSHOT_TAGS, snapshot_tags, self.source_snapshot_id,
                                   self.owner)
                ec2_client = get_client_with_retries(service_name="ec2",
                                                     methods=[
                                                         "create_tags",
                                                         "delete_tags"
                                                     ],
                                                     context=self._context_,
                                                     region=self.source_region,
                                                     session=session_for_tagging,
                                                     logger=self._logger_)

                tagging.set_ec2_tags(ec2_client=ec2_client,
                                     resource_ids=[self.source_snapshot_id],
                                     tags=snapshot_tags,
                                     logger=self._logger_)

            except Exception as ex:
                raise_exception(ERR_SETTING_SOURCE_SHARED_TAGS, self.owner, str(ex))

        if snapshot_create_data.get("already-copied", False):
            self._logger_.info(INF_COMPLETE_ALREADY_COPIED, self.source_snapshot_id)
            return self.result

        if snapshot_create_data.get("not-longer-available", False):
            self._logger_.info(INF_COMPLETED_NOT_LONGER_AVAILABLE, self.source_snapshot_id)
            return self.result

        # create service instance to test if snapshot exists
        ec2 = services.create_service("ec2", session=self._session_,
                                      service_retry_strategy=get_default_retry_strategy("ec2", context=self._context_))

        copy_snapshot_id = snapshot_create_data["copy-snapshot-id"]
        # test if the snapshot with the id that was returned from the CopySnapshot API call exists and is completed
        copied_snapshot = ec2.get(services.ec2_service.SNAPSHOTS,
                                  region=self._destination_region_,
                                  OwnerIds=["self"],
                                  Filters=[
                                      {
                                          "Name": "snapshot-id", "Values": [copy_snapshot_id]
                                      }
                                  ])

        if copied_snapshot is not None:
            self._logger_.debug(INF_CHECK_COMPLETED_RESULT, copied_snapshot)

        state = copied_snapshot["State"] if copied_snapshot is not None else None

        if copied_snapshot is None or state == SNAPSHOT_STATE_PENDING:
            self._logger_.info(INF_COPY_PENDING, copy_snapshot_id, self._destination_region_)
            return None

        if state == SNAPSHOT_STATE_ERROR:
            copied_tag_name = Ec2CopySnapshotAction.marker_tag_copied_to(self._task_)
            self.ec2_source_client.delete_tags_with_retries(Resources=[self.source_snapshot_id],
                                                            Tags=[
                                                                {
                                                                    "Key": copied_tag_name
                                                                }
                                                            ])
            raise_exception(ERR_COPY_SNAPSHOT)

        if state == SNAPSHOT_STATE_COMPLETED:
            self._logger_.info(INF_COPY_COMPLETED, self.source_snapshot_id, self.source_region, copy_snapshot_id,
                               self._destination_region_)
            grant_create_volume_permissions(copy_snapshot_id)
            tag_shared_snapshots(snapshot_create_data.get("tags", {}), copy_snapshot_id)
            tag_shared_source_snapshot(copy_snapshot_id)
            if self.delete_after_copy:
                delete_source_after_copy()
            else:
                set_source_snapshot_tags(copy_snapshot_id)

            # wait there for 15 seconds as count the limit for max number of concurrent snapshot copies
            # by the EC2 service is sometimes delayed
            time.sleep(5)

            return copied_snapshot

        return None