nubia/internal/cmdbase.py (357 lines of code) (raw):
#!/usr/bin/env python3
# Copyright (c) Facebook, Inc. and its affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
#
import asyncio
import copy
import inspect
import sys
import traceback
from collections import OrderedDict
from textwrap import dedent
from typing import Iterable, Optional
from prompt_toolkit.completion import CompleteEvent, Completion, WordCompleter
from prompt_toolkit.document import Document
from termcolor import cprint
from nubia.internal import parser
from nubia.internal.completion import AutoCommandCompletion
from nubia.internal.exceptions import CommandParseError
from nubia.internal.helpers import (
find_approx,
function_to_str,
suggestions_msg,
try_await,
)
from nubia.internal.options import Options
from nubia.internal.typing import FunctionInspection, inspect_object
from nubia.internal.typing.argparse import (
get_arguments_for_command,
get_arguments_for_inspection,
register_command,
)
from nubia.internal.typing.builder import apply_typing
from nubia.internal.typing.inspect import is_list_type
from . import context
class Command:
"""A Command is the abstraction over one or more commands that will executed
by the shell, A Command sub-class must implement `cmds` with a dict that
maps command to a description.
"""
def __init__(self):
self._command_registry = None
self._built_in = False
@property
def built_in(self) -> bool:
return self._built_in
def set_command_registry(self, command_registry):
self._command_registry = command_registry
async def run_interactive(self, cmd, args, raw):
"""
This function MUST be overridden by all commands. It will be called when
the command is executed in interactive mode.
"""
raise NotImplementedError("run_interactive must be overridden")
async def run_cli(self, args):
"""
This function SHOULD be implemented in order to expose a subcommand in
the CLI interface. It will be called when run from the CLI.
"""
pass
async def add_arguments(self, parser):
"""
This function receives an instance of an "argparse.ArgumentParser".
Every command SHOULD use it to tell the CLI interface which options
needs.
"""
# register_command(parser, inspect_object(self._fn))
pass
@property
def metadata(self) -> FunctionInspection:
"""
Returns the command specification as an instance of FunctionInspection
object. This is used to generate a completion model for external
completers
"""
return {}
def get_completions(self, cmd, document, complete_event) -> Iterable[Completion]:
"""
This function SHOULD be implemented to feed the interactive auto
completion of command arguments. Example: auto complete the available
tables in the "describe" query command.
"""
return []
def get_command_names(self):
"""
This function MUST be implemented to tell the framework which commands
this module implements. Must return a list of strings.
"""
raise NotImplementedError("get_command_names must be overridden")
def get_cli_aliases(self):
"""
This function SHOULD be implemented to instruct the command dispatcher
about alternative commands available in the CLI. Example: while the
"commands/query.py" exports "select" and" describe" in interactive
mode, the CLI uses the subcommand "query" to run those commands.
Must return a list of strings.
"""
return []
def get_help(self, cmd, *args):
"""
This function SHOULD be implemented to show command help when running
':help'. It must return a string associated with the given command.
"""
pass
def get_help_short(self, cmd, *args):
"""Return a shortened help.
This is for example used for interactive autocompletion."""
help = self.get_help(cmd, *args)
return help.split("\n", 1)[0] if help else None
@property
def super_command(self) -> bool:
"""
Does this command parse sub-commands?
"""
return False
def has_subcommand(self, subcommand) -> bool:
"""
Does this command have `subcommand` as a valid sub-command?
"""
return False
class AutoCommand(Command):
def __init__(self, fn, options: Optional[Options] = None):
self._built_in = False
self._fn = fn
self._options = options or Options()
if not callable(fn):
raise ValueError("fn argument must be a callable")
self._obj_metadata = inspect_object(fn)
self._is_super_command = len(self.metadata.subcommands) > 0
self._subcommand_names = []
# We never expect a function to be passed here that has a self argument
# In that case, we should get a bound method
if "self" in self.metadata.arguments and not inspect.ismethod(self._fn):
raise ValueError(
"Expecting either a function (eg. bar) or "
"a bound method (eg. Foo().bar). "
"You passed what appears to be an unbound method "
"(eg. Foo.bar) it has a 'self' argument: %s" % function_to_str(fn)
)
if not self.metadata.command:
raise ValueError(
"function or class {} needs to be annotated with "
"@command".format(function_to_str(fn))
)
# If this is a super command, we need a completer for sub-commands
if self.super_command:
self._commands_completer = WordCompleter(
[], ignore_case=True, sentence=True
)
for _, inspection in self.metadata.subcommands:
_sub_name = inspection.command.name
self._commands_completer.words.append(_sub_name)
self._commands_completer.meta_dict[_sub_name] = dedent(
inspection.command.help
).strip()
self._subcommand_names.append(_sub_name)
@property
def metadata(self) -> FunctionInspection:
"""
The Inspection object of this command. This object contains all the
information required by AutoCommand to understand the command arguments
type information, help messages, aliases, and attributes.
"""
return self._obj_metadata
def _create_subcommand_obj(self, key_values):
"""
Instantiates an object of the super command class, passes the right
arguments and returns a dict with the remaining unused arguments
"""
kwargs = {
k: v
for k, v in get_arguments_for_inspection(self.metadata, key_values).items()
if v is not None
}
remaining = {
k: v
for k, v in key_values.items()
if k.replace("-", "_") not in kwargs.keys()
}
return self._fn(**kwargs), remaining
async def run_interactive(self, cmd, args, raw):
try:
args_metadata = self.metadata.arguments
parsed = parser.parse(args, expect_subcommand=self.super_command)
# prepare args dict
parsed_dict = parsed.asDict()
args_dict = parsed.kv.asDict()
key_values = parsed.kv.asDict()
command_name = cmd
# if this is a super command, we need first to create an instance of
# the class (fn) and pass the right arguments
if self.super_command:
subcommand = parsed_dict.get("__subcommand__")
if not subcommand:
cprint(
"A sub-command must be supplied, valid values: "
"{}".format(", ".join(self._get_subcommands())),
"red",
)
return 2
subcommands = self._get_subcommands()
if subcommand not in subcommands:
suggestions = find_approx(subcommand, subcommands)
if (
len(suggestions) == 1
and self._options.auto_execute_single_suggestions
):
print()
cprint(
"Auto-correcting '{}' to '{}'".format(
subcommand, suggestions[0]
),
"red",
attrs=["bold"],
)
subcommand = suggestions[0]
else:
print()
cprint(
"Invalid sub-command '{}'{} "
"valid sub-commands: {}".format(
subcommand,
suggestions_msg(suggestions),
", ".join(self._get_subcommands()),
),
"red",
attrs=["bold"],
)
return 2
sub_inspection = self.subcommand_metadata(subcommand)
instance, remaining_args = self._create_subcommand_obj(args_dict)
assert instance
args_dict = remaining_args
key_values = copy.copy(args_dict)
args_metadata = sub_inspection.arguments
attrname = self._find_subcommand_attr(subcommand)
command_name = subcommand
assert attrname is not None
fn = getattr(instance, attrname)
else:
# not a super-command, use use the function instead
fn = self._fn
positionals = parsed_dict["positionals"] if parsed.positionals != "" else []
# We only allow positionals for arguments that have positional=True
# ِ We filter out the OrderedDict this way to ensure we don't lose the
# order of the arguments. We also filter out arguments that have
# been passed by name already. The order of the positional arguments
# follows the order of the function definition.
can_be_positional = self._positional_arguments(
args_metadata, args_dict.keys()
)
if len(positionals) > len(can_be_positional):
if len(can_be_positional) == 0:
err = "This command does not support positional arguments"
else:
# We have more positionals than we should
err = (
"This command only supports ({}) positional arguments, "
"namely arguments ({}). You have passed {} arguments ({})"
" instead!"
).format(
len(can_be_positional),
", ".join(can_be_positional.keys()),
len(positionals),
", ".join(str(x) for x in positionals),
)
cprint(err, "red")
return 2
# constuct key_value dict from positional arguments.
args_from_positionals = {
key: value for value, key in zip(positionals, can_be_positional)
}
# update the total arguments dict with the positionals
args_dict.update(args_from_positionals)
# Run some validations on number of arguments provided
# do we have keys that are supplied in both positionals and
# key_value style?
duplicate_keys = set(args_from_positionals.keys()).intersection(
set(key_values.keys())
)
if duplicate_keys:
cprint(
"Arguments '{}' have been passed already, cannot have"
" duplicate keys".format(list(duplicate_keys)),
"red",
)
return 2
# check for verbosity override in kwargs
ctx = context.get_context()
old_verbose = ctx.args.verbose
if "verbose" in args_dict:
ctx.set_verbose(args_dict["verbose"])
del args_dict["verbose"]
del key_values["verbose"]
# do we have keys that we know nothing about?
extra_keys = set(args_dict.keys()) - set(args_metadata)
if extra_keys:
cprint(
f"Unknown argument(s) {sorted(extra_keys)} were passed",
"magenta",
)
return 2
# is there any required keys that were not resolved from positionals
# nor key_values?
missing_keys = set(args_metadata) - set(args_dict.keys())
if missing_keys:
required_missing = []
for key in missing_keys:
if not args_metadata[key].default_value_set:
required_missing.append(key)
if required_missing:
cprint(
"Missing required argument(s) {} for command"
" {}".format(required_missing, command_name),
"yellow",
)
return 3
# convert expected types for arguments
for key, value in args_dict.items():
target_type = args_metadata[key].type
if target_type is None:
target_type = str
try:
new_value = apply_typing(value, target_type)
except ValueError:
fn_name = function_to_str(target_type, False, False)
cprint(
'Cannot convert value "{}" to {} on argument {}'.format(
value, fn_name, key
),
"yellow",
)
return 4
else:
args_dict[key] = new_value
# Validate that arguments with `choices` are supplied with the
# acceptable values.
for arg, value in args_dict.items():
choices = args_metadata[arg].choices
if choices:
# Validate the choices in the case of values and list of
# values.
if is_list_type(args_metadata[arg].type):
bad_inputs = [v for v in value if v not in choices]
if bad_inputs:
cprint(
f"Argument '{arg}' got an unexpected "
f"value(s) '{bad_inputs}'. Expected one "
f"or more of {choices}.",
"red",
)
return 4
elif value not in choices:
cprint(
f"Argument '{arg}' got an unexpected value "
f"'{value}'. Expected one of "
f"{choices}.",
"red",
)
return 4
# arguments appear to be fine, time to run the function
try:
# convert argument names back to match the function signature
args_dict = {args_metadata[k].arg: v for k, v in args_dict.items()}
ret = await try_await(fn(**args_dict))
ctx.set_verbose(old_verbose)
except Exception as e:
cprint("Error running command: {}".format(str(e)), "red")
cprint("-" * 60, "yellow")
traceback.print_exc(file=sys.stderr)
cprint("-" * 60, "yellow")
return 1
return ret
except CommandParseError as e:
cprint("Error parsing command", "red")
cprint(cmd + " " + args, "white", attrs=["bold"])
cprint((" " * (e.col + len(cmd))) + "^", "white", attrs=["bold"])
cprint(str(e), "yellow")
return 1
def _positional_arguments(self, args_metadata, filter_out):
positionals = OrderedDict()
for k, v in args_metadata.items():
if v.positional and k not in filter_out:
positionals[k] = v
return positionals
def subcommand_metadata(self, name: str) -> FunctionInspection:
assert self.super_command
subcommands = self.metadata.subcommands
for _, inspection in subcommands:
if inspection.command.name == name:
return inspection
def _find_subcommand_attr(self, name):
assert self.super_command
subcommands = self.metadata.subcommands
for attr, inspection in subcommands:
if inspection.command.name == name or name in inspection.command.aliases:
return attr
# be explicit about returning None for readability
return None
def _get_subcommands(self) -> Iterable[str]:
assert self.super_command
return [inspection.command.name for _, inspection in self.metadata.subcommands]
def _kwargs_for_fn(self, fn, args):
return {
k: v
for k, v in get_arguments_for_command(fn, args).items()
if v is not None
}
async def run_cli(self, args):
# if this is a super-command, we need to dispatch the call to the
# correct function
kwargs = self._kwargs_for_fn(self._fn, args)
try:
if self._is_super_command:
# let's instantiate an instance of the klass
instance = self._fn(**kwargs)
# we need to find the actual method we want to call, in addition to
# this we need to extract the correct kwargs for this method
# find which function it is in the sub commands
attrname = self._find_subcommand_attr(args._subcmd)
assert attrname is not None
fn = getattr(instance, attrname)
kwargs = self._kwargs_for_fn(fn, args)
else:
fn = self._fn
ret = await try_await(fn(**kwargs))
return ret
except Exception as e:
cprint("Error running command: {}".format(str(e)), "red")
cprint("-" * 60, "yellow")
traceback.print_exc(file=sys.stderr)
cprint("-" * 60, "yellow")
return 1
@property
def super_command(self):
return self._is_super_command
def has_subcommand(self, subcommand):
assert self.super_command
return subcommand.lower() in self._subcommand_names
async def add_arguments(self, parser):
register_command(parser, self.metadata)
def get_command_names(self):
command = self.metadata.command
return [command.name] + command.aliases
def get_completions(
self, _: str, document: Document, complete_event: CompleteEvent
) -> Iterable[Completion]:
if self._is_super_command:
exploded = document.text.lstrip().split(" ", 1)
# Are we at the first word? we expect a sub-command here
if len(exploded) <= 1:
return self._commands_completer.get_completions(
document, complete_event
)
state_machine = AutoCommandCompletion(self, document, complete_event)
return state_machine.get_completions()
def get_help(self, cmd, *args):
help = self.metadata.command.help
return dedent(help).strip() if help else None