awscli/clidriver.py (505 lines of code) (raw):

# Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"). You # may not use this file except in compliance with the License. A copy of # the License is located at # # http://aws.amazon.com/apache2.0/ # # or in the "license" file accompanying this file. This file is # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF # ANY KIND, either express or implied. See the License for the specific # language governing permissions and limitations under the License. import logging import signal import sys import botocore.session from botocore.compat import OrderedDict, copy_kwargs from botocore.exceptions import ( NoCredentialsError, NoRegionError, ProfileNotFound, ) from botocore.history import get_global_history_recorder from awscli import EnvironmentVariables, __version__ from awscli.alias import AliasCommandInjector, AliasLoader from awscli.argparser import ( USAGE, ArgTableArgParser, MainArgParser, ServiceArgParser, ) from awscli.argprocess import unpack_argument from awscli.arguments import ( BooleanArgument, CLIArgument, CustomArgument, ListArgument, UnknownArgumentError, ) from awscli.commands import CLICommand from awscli.compat import get_stderr_text_writer from awscli.formatter import get_formatter from awscli.help import ( OperationHelpCommand, ProviderHelpCommand, ServiceHelpCommand, ) from awscli.plugin import load_plugins from awscli.utils import emit_top_level_args_parsed_event, write_exception from botocore import __version__ as botocore_version from botocore import xform_name LOG = logging.getLogger('awscli.clidriver') LOG_FORMAT = ( '%(asctime)s - %(threadName)s - %(name)s - %(levelname)s - %(message)s' ) HISTORY_RECORDER = get_global_history_recorder() # Don't remove this line. The idna encoding # is used by getaddrinfo when dealing with unicode hostnames, # and in some cases, there appears to be a race condition # where threads will get a LookupError on getaddrinfo() saying # that the encoding doesn't exist. Using the idna encoding before # running any CLI code (and any threads it may create) ensures that # the encodings.idna is imported and registered in the codecs registry, # which will stop the LookupErrors from happening. # See: https://bugs.python.org/issue29288 ''.encode('idna') def main(): driver = create_clidriver() rc = driver.main() HISTORY_RECORDER.record('CLI_RC', rc, 'CLI') return rc def create_clidriver(): session = botocore.session.Session(EnvironmentVariables) _set_user_agent_for_session(session) load_plugins( session.full_config.get('plugins', {}), event_hooks=session.get_component('event_emitter'), ) driver = CLIDriver(session=session) return driver def _set_user_agent_for_session(session): session.user_agent_name = 'aws-cli' session.user_agent_version = __version__ session.user_agent_extra = 'botocore/%s' % botocore_version class CLIDriver: def __init__(self, session=None): if session is None: self.session = botocore.session.get_session(EnvironmentVariables) _set_user_agent_for_session(self.session) else: self.session = session self._cli_data = None self._command_table = None self._argument_table = None self.alias_loader = AliasLoader() def _get_cli_data(self): # Not crazy about this but the data in here is needed in # several places (e.g. MainArgParser, ProviderHelp) so # we load it here once. if self._cli_data is None: self._cli_data = self.session.get_data('cli') return self._cli_data def _get_command_table(self): if self._command_table is None: self._command_table = self._build_command_table() return self._command_table def _get_argument_table(self): if self._argument_table is None: self._argument_table = self._build_argument_table() return self._argument_table def _build_command_table(self): """ Create the main parser to handle the global arguments. :rtype: ``argparser.ArgumentParser`` :return: The parser object """ command_table = self._build_builtin_commands(self.session) self.session.emit( 'building-command-table.main', command_table=command_table, session=self.session, command_object=self, ) return command_table def _build_builtin_commands(self, session): commands = OrderedDict() services = session.get_available_services() for service_name in services: commands[service_name] = ServiceCommand( cli_name=service_name, session=self.session, service_name=service_name, ) return commands def _add_aliases(self, command_table, parser): injector = AliasCommandInjector(self.session, self.alias_loader) injector.inject_aliases(command_table, parser) def _build_argument_table(self): argument_table = OrderedDict() cli_data = self._get_cli_data() cli_arguments = cli_data.get('options', None) for option in cli_arguments: option_params = copy_kwargs(cli_arguments[option]) cli_argument = self._create_cli_argument(option, option_params) cli_argument.add_to_arg_table(argument_table) # Then the final step is to send out an event so handlers # can add extra arguments or modify existing arguments. self.session.emit( 'building-top-level-params', argument_table=argument_table ) return argument_table def _create_cli_argument(self, option_name, option_params): return CustomArgument( option_name, help_text=option_params.get('help', ''), dest=option_params.get('dest'), default=option_params.get('default'), action=option_params.get('action'), required=option_params.get('required'), choices=option_params.get('choices'), cli_type_name=option_params.get('type'), ) def create_help_command(self): cli_data = self._get_cli_data() return ProviderHelpCommand( self.session, self._get_command_table(), self._get_argument_table(), cli_data.get('description', None), cli_data.get('synopsis', None), cli_data.get('help_usage', None), ) def _create_parser(self, command_table): # Also add a 'help' command. command_table['help'] = self.create_help_command() cli_data = self._get_cli_data() parser = MainArgParser( command_table, self.session.user_agent(), cli_data.get('description', None), self._get_argument_table(), prog="aws", ) return parser def main(self, args=None): """ :param args: List of arguments, with the 'aws' removed. For example, the command "aws s3 list-objects --bucket foo" will have an args list of ``['s3', 'list-objects', '--bucket', 'foo']``. """ if args is None: args = sys.argv[1:] command_table = self._get_command_table() parser = self._create_parser(command_table) self._add_aliases(command_table, parser) parsed_args, remaining = parser.parse_known_args(args) try: # Because _handle_top_level_args emits events, it's possible # that exceptions can be raised, which should have the same # general exception handling logic as calling into the # command table. This is why it's in the try/except clause. self._handle_top_level_args(parsed_args) self._emit_session_event(parsed_args) HISTORY_RECORDER.record( 'CLI_VERSION', self.session.user_agent(), 'CLI' ) HISTORY_RECORDER.record('CLI_ARGUMENTS', args, 'CLI') return command_table[parsed_args.command](remaining, parsed_args) except UnknownArgumentError as e: sys.stderr.write("usage: %s\n" % USAGE) sys.stderr.write(str(e)) sys.stderr.write("\n") return 255 except NoRegionError as e: msg = ( '%s You can also configure your region by running ' '"aws configure".' % e ) self._show_error(msg) return 255 except NoCredentialsError as e: msg = ( f'{e}. You can configure credentials by running "aws configure".' ) self._show_error(msg) return 255 except KeyboardInterrupt: # Shell standard for signals that terminate # the process is to return 128 + signum, in this case # SIGINT=2, so we'll have an RC of 130. sys.stdout.write("\n") return 128 + signal.SIGINT except Exception as e: LOG.debug("Exception caught in main()", exc_info=True) LOG.debug("Exiting with rc 255") write_exception(e, outfile=get_stderr_text_writer()) return 255 def _emit_session_event(self, parsed_args): # This event is guaranteed to run after the session has been # initialized and a profile has been set. This was previously # problematic because if something in CLIDriver caused the # session components to be reset (such as session.profile = foo) # then all the prior registered components would be removed. self.session.emit( 'session-initialized', session=self.session, parsed_args=parsed_args, ) def _show_error(self, msg): LOG.debug(msg, exc_info=True) sys.stderr.write(msg) sys.stderr.write('\n') def _handle_top_level_args(self, args): emit_top_level_args_parsed_event(self.session, args) if args.profile: self.session.set_config_variable('profile', args.profile) if args.region: self.session.set_config_variable('region', args.region) if args.debug: # TODO: # Unfortunately, by setting debug mode here, we miss out # on all of the debug events prior to this such as the # loading of plugins, etc. self.session.set_stream_logger( 'botocore', logging.DEBUG, format_string=LOG_FORMAT ) self.session.set_stream_logger( 'awscli', logging.DEBUG, format_string=LOG_FORMAT ) self.session.set_stream_logger( 's3transfer', logging.DEBUG, format_string=LOG_FORMAT ) self.session.set_stream_logger( 'urllib3', logging.DEBUG, format_string=LOG_FORMAT ) LOG.debug("CLI version: %s", self.session.user_agent()) LOG.debug("Arguments entered to CLI: %s", sys.argv[1:]) else: self.session.set_stream_logger( logger_name='awscli', log_level=logging.ERROR ) class ServiceCommand(CLICommand): """A service command for the CLI. For example, ``aws ec2 ...`` we'd create a ServiceCommand object that represents the ec2 service. """ def __init__(self, cli_name, session, service_name=None): # The cli_name is the name the user types, the name we show # in doc, etc. # The service_name is the name we used internally with botocore. # For example, we have the 's3api' as the cli_name for the service # but this is actually bound to the 's3' service name in botocore, # i.e. we load s3.json from the botocore data dir. Most of # the time these are the same thing but in the case of renames, # we want users/external things to be able to rename the cli name # but *not* the service name, as this has to be exactly what # botocore expects. self._name = cli_name self.session = session self._command_table = None if service_name is None: # Then default to using the cli name. self._service_name = cli_name else: self._service_name = service_name self._lineage = [self] self._service_model = None @property def name(self): return self._name @name.setter def name(self, value): self._name = value @property def service_model(self): return self._get_service_model() @property def lineage(self): return self._lineage @lineage.setter def lineage(self, value): self._lineage = value def _get_command_table(self): if self._command_table is None: self._command_table = self._create_command_table() return self._command_table def _get_service_model(self): if self._service_model is None: try: api_version = self.session.get_config_variable( 'api_versions' ).get(self._service_name, None) except ProfileNotFound: api_version = None self._service_model = self.session.get_service_model( self._service_name, api_version=api_version ) return self._service_model def __call__(self, args, parsed_globals): # Once we know we're trying to call a service for this operation # we can go ahead and create the parser for it. We # can also grab the Service object from botocore. service_parser = self._create_parser() parsed_args, remaining = service_parser.parse_known_args(args) command_table = self._get_command_table() return command_table[parsed_args.operation](remaining, parsed_globals) def _create_command_table(self): command_table = OrderedDict() service_model = self._get_service_model() for operation_name in service_model.operation_names: cli_name = xform_name(operation_name, '-') operation_model = service_model.operation_model(operation_name) command_table[cli_name] = ServiceOperation( name=cli_name, parent_name=self._name, session=self.session, operation_model=operation_model, operation_caller=CLIOperationCaller(self.session), ) self.session.emit( f'building-command-table.{self._name}', command_table=command_table, session=self.session, command_object=self, ) self._add_lineage(command_table) return command_table def _add_lineage(self, command_table): for command in command_table: command_obj = command_table[command] command_obj.lineage = self.lineage + [command_obj] def create_help_command(self): command_table = self._get_command_table() return ServiceHelpCommand( session=self.session, obj=self._get_service_model(), command_table=command_table, arg_table=None, event_class='.'.join(self.lineage_names), name=self._name, ) def _create_parser(self): command_table = self._get_command_table() # Also add a 'help' command. command_table['help'] = self.create_help_command() return ServiceArgParser( operations_table=command_table, service_name=self._name ) class ServiceOperation: """A single operation of a service. This class represents a single operation for a service, for example ``ec2.DescribeInstances``. """ ARG_TYPES = { 'list': ListArgument, 'boolean': BooleanArgument, } DEFAULT_ARG_CLASS = CLIArgument def __init__( self, name, parent_name, operation_caller, operation_model, session ): """ :type name: str :param name: The name of the operation/subcommand. :type parent_name: str :param parent_name: The name of the parent command. :type operation_model: ``botocore.model.OperationModel`` :param operation_object: The operation model associated with this subcommand. :type operation_caller: ``CLIOperationCaller`` :param operation_caller: An object that can properly call the operation. :type session: ``botocore.session.Session`` :param session: The session object. """ self._arg_table = None self._name = name # These is used so we can figure out what the proper event # name should be <parent name>.<name>. self._parent_name = parent_name self._operation_caller = operation_caller self._lineage = [self] self._operation_model = operation_model self._session = session if operation_model.deprecated: self._UNDOCUMENTED = True @property def name(self): return self._name @name.setter def name(self, value): self._name = value @property def lineage(self): return self._lineage @lineage.setter def lineage(self, value): self._lineage = value @property def lineage_names(self): # Represents the lineage of a command in terms of command ``name`` return [cmd.name for cmd in self.lineage] @property def arg_table(self): if self._arg_table is None: self._arg_table = self._create_argument_table() return self._arg_table def __call__(self, args, parsed_globals): # Once we know we're trying to call a particular operation # of a service we can go ahead and load the parameters. event = ( 'before-building-argument-table-parser.' f'{self._parent_name}.{self._name}' ) self._emit( event, argument_table=self.arg_table, args=args, session=self._session, parsed_globals=parsed_globals, ) operation_parser = self._create_operation_parser(self.arg_table) self._add_help(operation_parser) parsed_args, remaining = operation_parser.parse_known_args(args) if parsed_args.help == 'help': op_help = self.create_help_command() return op_help(remaining, parsed_globals) elif parsed_args.help: remaining.append(parsed_args.help) if remaining: raise UnknownArgumentError( f"Unknown options: {', '.join(remaining)}" ) event = f'operation-args-parsed.{self._parent_name}.{self._name}' self._emit( event, parsed_args=parsed_args, parsed_globals=parsed_globals ) call_parameters = self._build_call_parameters( parsed_args, self.arg_table ) event = f'calling-command.{self._parent_name}.{self._name}' override = self._emit_first_non_none_response( event, call_parameters=call_parameters, parsed_args=parsed_args, parsed_globals=parsed_globals, ) # There are two possible values for override. It can be some type # of exception that will be raised if detected or it can represent # the desired return code. Note that a return code of 0 represents # a success. if override is not None: if isinstance(override, Exception): # If the override value provided back is an exception then # raise the exception raise override else: # This is the value usually returned by the ``invoke()`` # method of the operation caller. It represents the return # code of the operation. return override else: # No override value was supplied. return self._operation_caller.invoke( self._operation_model.service_model.service_name, self._operation_model.name, call_parameters, parsed_globals, ) def create_help_command(self): return OperationHelpCommand( self._session, operation_model=self._operation_model, arg_table=self.arg_table, name=self._name, event_class='.'.join(self.lineage_names), ) def _add_help(self, parser): # The 'help' output is processed a little differently from # the operation help because the arg_table has # CLIArguments for values. parser.add_argument('help', nargs='?') def _build_call_parameters(self, args, arg_table): # We need to convert the args specified on the command # line as valid **kwargs we can hand to botocore. service_params = {} # args is an argparse.Namespace object so we're using vars() # so we can iterate over the parsed key/values. parsed_args = vars(args) for arg_object in arg_table.values(): py_name = arg_object.py_name if py_name in parsed_args: value = parsed_args[py_name] value = self._unpack_arg(arg_object, value) arg_object.add_to_params(service_params, value) return service_params def _unpack_arg(self, cli_argument, value): # Unpacks a commandline argument into a Python value by firing the # load-cli-arg.service-name.operation-name event. session = self._session service_name = self._operation_model.service_model.endpoint_prefix operation_name = xform_name(self._name, '-') return unpack_argument( session, service_name, operation_name, cli_argument, value ) def _create_argument_table(self): argument_table = OrderedDict() input_shape = self._operation_model.input_shape required_arguments = [] arg_dict = {} if input_shape is not None: required_arguments = input_shape.required_members arg_dict = input_shape.members for arg_name, arg_shape in arg_dict.items(): cli_arg_name = xform_name(arg_name, '-') arg_class = self.ARG_TYPES.get( arg_shape.type_name, self.DEFAULT_ARG_CLASS ) is_token = arg_shape.metadata.get('idempotencyToken', False) is_required = arg_name in required_arguments and not is_token event_emitter = self._session.get_component('event_emitter') arg_object = arg_class( name=cli_arg_name, argument_model=arg_shape, is_required=is_required, operation_model=self._operation_model, serialized_name=arg_name, event_emitter=event_emitter, ) arg_object.add_to_arg_table(argument_table) LOG.debug(argument_table) self._emit( f'building-argument-table.{self._parent_name}.{self._name}', operation_model=self._operation_model, session=self._session, command=self, argument_table=argument_table, ) return argument_table def _emit(self, name, **kwargs): return self._session.emit(name, **kwargs) def _emit_first_non_none_response(self, name, **kwargs): return self._session.emit_first_non_none_response(name, **kwargs) def _create_operation_parser(self, arg_table): parser = ArgTableArgParser(arg_table) return parser class CLIOperationCaller: """Call an AWS operation and format the response.""" def __init__(self, session): self._session = session def invoke(self, service_name, operation_name, parameters, parsed_globals): """Invoke an operation and format the response. :type service_name: str :param service_name: The name of the service. Note this is the service name, not the endpoint prefix (e.g. ``ses`` not ``email``). :type operation_name: str :param operation_name: The operation name of the service. The casing of the operation name should match the exact casing used by the service, e.g. ``DescribeInstances``, not ``describe-instances`` or ``describe_instances``. :type parameters: dict :param parameters: The parameters for the operation call. Again, these values have the same casing used by the service. :type parsed_globals: Namespace :param parsed_globals: The parsed globals from the command line. :return: None, the result is displayed through a formatter, but no value is returned. """ client = self._session.create_client( service_name, region_name=parsed_globals.region, endpoint_url=parsed_globals.endpoint_url, verify=parsed_globals.verify_ssl, ) response = self._make_client_call( client, operation_name, parameters, parsed_globals ) self._display_response(operation_name, response, parsed_globals) return 0 def _make_client_call( self, client, operation_name, parameters, parsed_globals ): py_operation_name = xform_name(operation_name) if client.can_paginate(py_operation_name) and parsed_globals.paginate: paginator = client.get_paginator(py_operation_name) response = paginator.paginate(**parameters) else: response = getattr(client, xform_name(operation_name))( **parameters ) return response def _display_response(self, command_name, response, parsed_globals): output = parsed_globals.output if output is None: output = self._session.get_config_variable('output') formatter = get_formatter(output, parsed_globals) formatter(command_name, response)