nubia/internal/completion.py (192 lines of code) (raw):
#!/usr/bin/env python3
# Copyright (c) Facebook, Inc. and its affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
#
import itertools
import logging
from typing import TYPE_CHECKING, Iterable
import pyparsing as pp
from prompt_toolkit.completion import CompleteEvent, Completion
from prompt_toolkit.document import Document
from nubia.internal import parser
from nubia.internal.helpers import function_to_str
if TYPE_CHECKING:
from nubia.internal.cmdbase import AutoCommand # noqa
class TokenParse:
"""
This class captures an interactive shell token that cannot be fully parser
by the interactive shell parser and analyze it.
"""
def __init__(self, token: str) -> None:
self._token = token
self._key = ""
self._is_argument = False
self._is_list = False
self._is_dict = False
self._last_value = ""
self.parse()
def parse(self):
key, delim, value = self._token.partition("=")
# Is everything before the = sane?
if any(x in key for x in "[]{}\"'"):
# We will treat this as positional in this case
return
# This is key=value
if delim == "=":
self._is_argument = True
self._key = key
else:
# This is positional, the value is the key
value = self._key
assert len(value) == 0
if len(value) > 0:
# Let's parse the value, is it a single, list, dict?
if value[0] == "[":
self._is_list = True
value = value.strip("[")
list_values = value.rpartition(",")
self._last_value = list_values[len(list_values) - 1].lstrip()
elif value[0] == "{":
self._is_dict = True
else:
self._last_value = value
@property
def is_argument(self) -> bool:
return self._is_argument
@property
def is_positional(self) -> bool:
return not self._is_argument
# Talks about the type of the value
@property
def is_list(self) -> bool:
return self._is_list
@property
def is_dict(self) -> bool:
return self._is_dict
@property
def argument_name(self) -> str:
assert self._is_argument
return self._key
def keys(self) -> Iterable[str]:
return []
def values(self) -> Iterable[str]:
return []
@property
def last_value(self) -> str:
return self._last_value
@property
def is_single_value(self) -> bool:
return not (self._is_dict or self._is_list)
class AutoCommandCompletion:
"""
This is the interactive completion state machine, it tracks the
parsed tokens out of a command input and builds a data model that is
used to understand what would be the next natural completion
token(s).
"""
def __init__(
self,
cmd_obj: "AutoCommand",
document: Document,
complete_event: CompleteEvent,
) -> None:
self.doc = document
self.cmd = cmd_obj
self.meta = self.cmd.metadata
self.event = complete_event
# current state
def get_completions(self) -> Iterable[Completion]:
"""
Returns a
"""
logger = logging.getLogger(f"{type(self).__name__}.get_completions")
remaining = None
try:
parsed = parser.parse(
self.doc.text, expect_subcommand=self.cmd.super_command
)
except parser.CommandParseError as e:
parsed = e.partial_result
remaining = e.remaining
# This is a funky but reliable way to figure that last token we are
# interested in manually parsing, This will return the last key=value
# including if the value is a 'value', [list], or {dict} or combination
# of these. This also matches positional arguments.
if self.doc.char_before_cursor in " ]}":
last_token = ""
else:
last_space = self.doc.find_backwards(" ", in_current_line=True) or -1
last_token = self.doc.text[(last_space + 1) :] # noqa
# We pick the bigger match here. The reason we want to look into
# remaining is to capture the state that we are in an open list,
# dictionary, or any other value that may have spaces in it but fails
# parsing (yet).
if remaining and len(remaining) > len(last_token):
last_token = remaining
try:
return self._prepare_args_completions(
parsed_command=parsed, last_token=last_token
)
except Exception as e:
logger.exception(str(e))
return []
def _prepare_args_completions(
self, parsed_command: pp.ParseResults, last_token
) -> Iterable[Completion]:
assert parsed_command is not None
args_meta = self.meta.arguments.values()
# are we expecting a sub command?
if self.cmd.super_command:
# We have a sub-command (supposedly)
subcommand = parsed_command.get("__subcommand__")
assert subcommand
sub_meta = self.cmd.subcommand_metadata(subcommand)
if not sub_meta:
logging.debug("Parsing unknown sub-command failed!")
return []
# we did find the sub-command, yay!
# In this case we chain the arguments from super and the
# sub-command together
args_meta = itertools.chain(args_meta, sub_meta.arguments.values())
# Now let's see if we can figure which argument we are talking about
args_meta = self._filter_arguments_by_prefix(last_token, args_meta)
# Which arguments did we fully parse already? let's avoid printing them
# in completions
parsed_keys = parsed_command.asDict().get("kv", [])
# We are either completing an argument name, argument value, or
# positional value.
# Dissect the last_token and figure what is the right completion
parsed_token = TokenParse(last_token)
if parsed_token.is_positional:
# TODO: Handle positional argument completions too
# To figure which positional we are in right now, we need to run the
# same logic that figures if all required arguments has been
# supplied and how many positionals have been processed and which
# one is next.
# This code is already in cmdbase.py run_interactive but needs to be
# refactored to be reusable here.
pass
elif parsed_token.is_argument:
argument_name = parsed_token.argument_name
arg = self._find_argument_by_name(argument_name)
if not arg or arg.choices in [False, None]:
return []
# TODO: Support dictionary keys/named tuples completion
if parsed_token.is_dict:
return []
# We are completing a value, in this case, we need to get the last
# meaninful piece of the token `x=[Tr` => `Tr`
return [
Completion(
text=str(choice),
start_position=-len(parsed_token.last_value),
)
for choice in arg.choices
if str(choice).lower().startswith(parsed_token.last_value.lower())
]
# We are completing arguments, or positionals.
# TODO: We would like to only show positional choices if we exhaust all
# required arguments. This will make it easier for the user to figure
# that there are still required named arguments. After that point we
# will show optional arguments and positionals as possible completions
ret = [
Completion(
text=arg_meta.name + "=",
start_position=-len(last_token),
display_meta=self._get_arg_help(arg_meta),
)
for arg_meta in args_meta
if arg_meta.name not in parsed_keys
]
return ret
def _filter_arguments_by_prefix(self, prefix: str, arguments=None):
arguments = arguments or self.meta.arguments.values()
if prefix:
return [
arg_meta for arg_meta in arguments if arg_meta.name.startswith(prefix)
]
return arguments
def _prepare_value_completions(self, prefix, partial_result):
parsed_keys = map(lambda x: x[0], partial_result.get("kv", []))
argument, rest = prefix.split("=", 1)
arguments = self._filter_arguments_by_prefix(argument)
if len(arguments) < 1:
return []
if len(arguments) == 1:
argument_obj = self._find_argument_by_name(argument)
assert argument_obj
# was that argument used before?
if argument in parsed_keys:
logging.debug(
"Argument {} was used already, not generating "
"completions".format(argument)
)
return []
return []
def _find_argument_by_name(self, name):
args_meta = list(self.meta.arguments.values())
if self.cmd.super_command:
# We need to get the subcommand name
subcommand_name = self.doc.text.split(" ")[0]
for _, sub in self.meta.subcommands:
if sub.command.name == subcommand_name:
args_meta.extend(list(sub.arguments.values()))
filtered = filter(lambda arg: arg.name == name, args_meta)
return next(filtered, None)
def _get_arg_help(self, arg_meta):
sb = ["["]
if arg_meta.type:
sb.append(function_to_str(arg_meta.type, False, False))
sb.append(", ")
if arg_meta.default_value_set:
sb.append("default: ")
sb.append(arg_meta.default_value)
else:
sb.append("required")
sb.append("] ")
sb.append(
arg_meta.description
if arg_meta.description
else "<no description provided>"
)
return "".join(str(item) for item in sb)