iact3/cli.py (284 lines of code) (raw):

import argparse import importlib import inspect import logging import sys import types from asyncio import iscoroutine from typing import List from iact3.util import exit_with_code LOG = logging.getLogger(__name__) class CustomParser(argparse.ArgumentParser): def error(self, message): sys.stderr.write(f'error: {message}\n') self.print_help() sys.exit(2) def _get_log_level(args, exit_func=exit_with_code): log_level = 'INFO' if ('-d' in args or '--debug' in args) and ('-q' in args or '--quiet' in args): exit_func(1, '--debug and --quiet cannot be specified simultaneously') if '-d' in args or '--debug' in args: log_level = 'DEBUG' if '-q' in args or '--quiet' in args: log_level = 'ERROR' return log_level class SetVerbosity(argparse.Action): def __call__(self, parser, namespace, values, option_string=None): LOG.setLevel(_get_log_level([option_string])) class GlobalArgs: ARGS = [ [ ['-q', '--quiet'], { 'action': SetVerbosity, 'nargs': 0, 'help': 'reduce output to the minimum', 'dest': '_quiet', }, ], [ ['-d', '--debug'], { 'action': SetVerbosity, 'nargs': 0, 'help': 'adds debug output and tracebacks', 'dest': '_debug', }, ], [ ['--profile'], { 'help': 'set the default profile used.', 'dest': '_profile' } ], [ ['--log-prefix'], { 'help': 'set the log prefix.', 'dest': '_log_prefix' } ] ] def __init__(self): self._profile = 'default' self._log_prefix = '' @property def profile(self): return self._profile @profile.setter def profile(self, profile): self._profile = profile @property def log_prefix(self): return self._log_prefix @log_prefix.setter def log_prefix(self, log_prefix): self._log_prefix = log_prefix GLOBAL_ARGS = GlobalArgs() class CliCore: USAGE = '{program}{global_opts}{command}{command_opts}{subcommand}{subcommand_opts}' longform_required: List = [] @classmethod def longform_param_required(cls, param_name): def wrapper(command_func): formatted_param = param_name.lower().replace('_', '-') qualname = command_func.__qualname__.replace('.__init__', '') cls.longform_required.append(f'{qualname}.{formatted_param}') return command_func return wrapper def __init__(self, program_name, module_package, description, version=None, args=None): self.name = program_name self.module_package = module_package self._modules = self._get_plugin_modules() self.args = {'global': args if args is not None else [], 'commands': {}} self._build_args() self.command_parser = None self.subcommand_parsers = {} self.parser = self._build_parser(description, version) self.parsed_args = [] def _build_args(self): for name, module in self._modules.items(): params = self._get_params(module) self.args['commands'][name] = {'args': params, 'subcommands': {}} for method_name, method_function in self._get_class_methods(module): if not method_name.startswith('_'): params = self._get_params(method_function) self.args['commands'][name]['subcommands'][method_name] = params def _init_config(self): pass @staticmethod def _get_class_methods(module): methods = inspect.getmembers(module, predicate=inspect.isfunction) return [method for method in methods if not method[0].startswith('_')] def _get_params(self, item): params = [] for param in inspect.signature(item).parameters.values(): if param.name == 'self' or param.name.startswith('_'): continue required = param.default == param.empty default = param.default if not required else None val_type = param.annotation if param.annotation in [str, int, bool] else str action = 'store_true' if val_type == bool else 'store' param_help = CliCore._get_param_help(item, param.name) name = param.name.lower() kwargs = {'action': action, 'help': param_help} if not required: name = name.replace('_', '-') kwargs.update( {'required': required, 'default': default, 'dest': param.name} ) if action == 'store': kwargs.update({'type': val_type}) if required: params.append([[name], kwargs]) else: if f'{item.__qualname__}.{name}' in self.longform_required: params.append([[f'--{name}'], kwargs]) else: params.append([[f'-{name[0]}', f'--{name}'], kwargs]) return params @staticmethod def _get_param_help(item, param): help_str = '' docstring = ( item.__doc__ if isinstance(item, types.FunctionType) else item.__init__.__doc__ ) if docstring is None: return help_str for line in docstring.split('\n'): if line.strip().startswith(f':param {param}:'): help_str = line.strip()[len(f':param {param}:'):].strip() break return help_str @staticmethod def _get_help(item): help_str = '' if item.__doc__ is None: return help_str for line in item.__doc__.split('\n'): if not line.strip().startswith(':'): help_str += line.strip() return help_str.strip() def _get_command_help(self, commands): help_str = '' for name, mod in commands.items(): mod_help = self._get_help(mod) if not mod_help: help_str += f'{name}\n' else: help_str += f'{name} - {mod_help}\n' return help_str.strip() def _add_sub_parser(self, usage, description, mod, parser, args): sub_parser = parser.add_parser( mod, usage=usage, description=description, formatter_class=argparse.RawDescriptionHelpFormatter, ) self._add_arguments(args, sub_parser) return sub_parser @staticmethod def _add_arguments(input_args, parser): for args, kwargs in input_args: parser.add_argument(*args, **kwargs) @staticmethod def _add_sub(parser, **kwargs): if sys.version_info[1] != 6 or 'required' not in kwargs: return parser.add_subparsers(**kwargs) required = kwargs['required'] kwargs.pop('required') sub = parser.add_subparsers(**kwargs) sub.required = required return sub def _build_parser(self, description, version): parser = CustomParser( description=description, usage=self._build_usage(), formatter_class=argparse.RawDescriptionHelpFormatter, ) if version: parser.add_argument('-v', '--version', action='version', version=version) self._add_arguments(self.args['global'], parser) description = self._get_command_help(self._modules) command_parser = self._add_sub( parser=parser, title='commands', description=description, required=True, metavar='', dest='_command', ) self.command_parser = command_parser for mod in self._modules: usage = self._build_usage({'command': mod}) description = self._get_help(self._modules[mod]) mod_parser = self._add_sub_parser(usage, description, mod, command_parser, self.args['commands'][mod]['args']) self.subcommand_parsers[mod] = mod_parser subcommands = self.args['commands'][mod]['subcommands'] if subcommands: class_methods = { m[0]: m[1] for m in self._get_class_methods(self._modules[mod]) } description = self._get_command_help(class_methods) subcommand_parser = self._add_sub( parser=mod_parser, title='subcommands', description=description, required=True, metavar='', dest='_subcommand', ) for subcommand_name, subcommand_args in subcommands.items(): usage = self._build_usage({'subcommand': subcommand_name}) description = self._get_help(class_methods[subcommand_name]) self._add_sub_parser(usage, description, subcommand_name, subcommand_parser, subcommand_args) return parser def _build_usage(self, args=None): args = args if args is not None else {} args['program'] = self.name if 'command' not in args: args['command'] = '<command>' if 'subcommand' not in args: args['subcommand'] = '[subcommand]' if 'global_opts' not in args: args['global_opts'] = '[args]' if 'command_opts' not in args: args['command_opts'] = '[args]' if 'subcommand_opts' not in args: args['subcommand_opts'] = '[args]' for key, val in args.items(): if val and not val.endswith(' '): args[key] = f'{val} ' return self.USAGE.format(**args) def _get_plugin_modules(self): members = inspect.getmembers(self.module_package, predicate=inspect.isclass) member_name_class = [] for name, cls in members: member_name_class.append((name, cls)) return {name.lower(): cls for name, cls in member_name_class} @staticmethod def _import_plugin_module(class_name, module_name): return getattr(importlib.import_module(module_name), class_name) def parse(self, args=None): if not args: args = [] self.parsed_args = self.parser.parse_args(args) return self.parsed_args async def run(self): args = self.parsed_args.__dict__ command = self._modules[args['_command']] subcommand = '' if '_subcommand' in args: subcommand = args['_subcommand'] args = {k: v for k, v in args.items() if not k.startswith('_')} if not subcommand: return await command.create(**args) ret = getattr(command(), subcommand)(**args) if iscoroutine(ret): return await ret else: return ret