awscli/customizations/dlm/createdefaultrole.py (111 lines of code) (raw):
# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
# Class to create default roles for lifecycle
import logging
from awscli.clidriver import CLIOperationCaller
from awscli.customizations.utils import get_policy_arn_suffix
from awscli.customizations.commands import BasicCommand
from awscli.customizations.dlm.iam import IAM
from awscli.customizations.dlm.constants \
import RESOURCES, \
LIFECYCLE_DEFAULT_ROLE_ASSUME_POLICY, \
POLICY_ARN_PATTERN, \
RESOURCE_TYPE_SNAPSHOT, \
RESOURCE_TYPE_IMAGE
LOG = logging.getLogger(__name__)
def _construct_result(create_role_response, get_policy_response):
get_policy_response.pop('ResponseMetadata', None)
create_role_response.pop('ResponseMetadata', None)
result = {'RolePolicy': get_policy_response}
result.update(create_role_response)
return result
# Display the result as formatted json
def display_response(session, operation_name, result, parsed_globals):
if result is not None:
cli_operation_caller = CLIOperationCaller(session)
# Calling a private method. Should be changed after the functionality
# is moved outside CliOperationCaller.
cli_operation_caller._display_response(
operation_name, result, parsed_globals)
# Get policy arn from region and policy name
def get_policy_arn(region, policy_name):
region_suffix = get_policy_arn_suffix(region)
role_arn = POLICY_ARN_PATTERN.format(region_suffix, policy_name)
return role_arn
# Method to parse the arguments to get the region value
def get_region(session, parsed_globals):
region = parsed_globals.region
if region is None:
region = session.get_config_variable('region')
return region
class CreateDefaultRole(BasicCommand):
NAME = "create-default-role"
DESCRIPTION = ('Creates the default IAM role '
' which will be used by Lifecycle service.\n'
'If the role does not exist, create-default-role '
'will automatically create it and set its policy.'
' If the role has been already '
'created, create-default-role'
' will not update its policy.'
'\n')
ARG_TABLE = [
{'name': 'iam-endpoint',
'no_paramfile': True,
'help_text': '<p>The IAM endpoint to call for creating the roles.'
' This is optional and should only be specified when a'
' custom endpoint should be called for IAM operations'
'.</p>'},
{'name': 'resource-type',
'default': RESOURCE_TYPE_SNAPSHOT,
'choices': [RESOURCE_TYPE_SNAPSHOT, RESOURCE_TYPE_IMAGE],
'help_text': (
"<p>The resource type for which the role needs to be created."
" The available options are '%s' and '%s'."
" This parameter defaults to '%s'.</p>"
% (RESOURCE_TYPE_SNAPSHOT, RESOURCE_TYPE_IMAGE,
RESOURCE_TYPE_SNAPSHOT))}
]
def __init__(self, session):
super(CreateDefaultRole, self).__init__(session)
def _run_main(self, parsed_args, parsed_globals):
"""Call to run the commands"""
self._region = get_region(self._session, parsed_globals)
self._endpoint_url = parsed_args.iam_endpoint
self._resource_type = parsed_args.resource_type
self._iam_client = IAM(self._session.create_client(
'iam',
region_name=self._region,
endpoint_url=self._endpoint_url,
verify=parsed_globals.verify_ssl
))
result = self._create_default_role_if_not_exists(parsed_globals)
display_response(
self._session,
'create_role',
result,
parsed_globals
)
return 0
def _create_default_role_if_not_exists(self, parsed_globals):
"""Method to create default lifecycle role
if it doesn't exist already
"""
role_name = RESOURCES[self._resource_type]['default_role_name']
assume_role_policy = LIFECYCLE_DEFAULT_ROLE_ASSUME_POLICY
if self._iam_client.check_if_role_exists(role_name):
LOG.debug('Role %s exists', role_name)
return None
LOG.debug('Role %s does not exist. '
'Creating default role for Lifecycle', role_name)
# Get Region
region = get_region(self._session, parsed_globals)
if region is None:
raise ValueError('You must specify a region. '
'You can also configure your region '
'by running "aws configure".')
managed_policy_arn = get_policy_arn(
region,
RESOURCES[self._resource_type]['default_policy_name']
)
# Don't proceed if managed policy does not exist
if not self._iam_client.check_if_policy_exists(managed_policy_arn):
LOG.debug('Managed Policy %s does not exist.', managed_policy_arn)
return None
LOG.debug('Managed Policy %s exists.', managed_policy_arn)
# Create default role
create_role_response = \
self._iam_client.create_role_with_trust_policy(
role_name,
assume_role_policy
)
# Attach policy to role
self._iam_client.attach_policy_to_role(
managed_policy_arn,
role_name
)
# Construct result
get_policy_response = self._iam_client.get_policy(managed_policy_arn)
return _construct_result(create_role_response, get_policy_response)