nubia/internal/interactive.py (163 lines of code) (raw):

#!/usr/bin/env python3 # Copyright (c) Facebook, Inc. and its affiliates. # All rights reserved. # # This source code is licensed under the BSD-style license found in the # LICENSE file in the root directory of this source tree. # import logging import os from typing import Any, List, Tuple from prompt_toolkit import PromptSession from prompt_toolkit.auto_suggest import AutoSuggestFromHistory from prompt_toolkit.completion import Completer from prompt_toolkit.document import Document from prompt_toolkit.enums import EditingMode from prompt_toolkit.formatted_text import PygmentsTokens from prompt_toolkit.history import FileHistory, InMemoryHistory from prompt_toolkit.layout.processors import HighlightMatchingBracketProcessor from prompt_toolkit.lexers import PygmentsLexer from prompt_toolkit.patch_stdout import patch_stdout from termcolor import cprint from nubia.internal.helpers import catchall, find_approx, suggestions_msg, try_await from nubia.internal.io.eventbus import Listener from nubia.internal.options import Options from nubia.internal.ui.lexer import NubiaLexer from nubia.internal.ui.style import shell_style def split_command(text): return text.lstrip(" ").split(" ", 1) class IOLoop(Listener): def __init__(self, context, plugin, usagelogger, options: Options): self._ctx = context self._command_registry = self._ctx.registry self._plugin = plugin self._options = options self._blacklist = self._plugin.getBlacklistPlugin() self._status_bar = self._plugin.get_status_bar(context) self._completer = ShellCompleter(self._command_registry) self._command_registry.register_listener(self) self._usagelogger = usagelogger def _build_cli(self): if self._options.persistent_history: history = FileHistory( os.path.join( os.path.expanduser("~"), ".{}_history".format(self._ctx.binary_name) ) ) else: history = InMemoryHistory() # If EDITOR does not exist, take EMACS # if it does, try fit the EMACS/VI pattern using upper editor = getattr( EditingMode, os.environ.get("EDITOR", "EMACS").upper(), EditingMode.EMACS ) return PromptSession( history=history, auto_suggest=AutoSuggestFromHistory(), lexer=PygmentsLexer(NubiaLexer), completer=self._completer, input_processors=[HighlightMatchingBracketProcessor(chars="[](){}")], style=shell_style, bottom_toolbar=self._get_bottom_toolbar, editing_mode=editor, complete_in_thread=True, refresh_interval=1, include_default_pygments_style=False, ) def _get_prompt_tokens(self) -> List[Tuple[Any, str]]: return self._plugin.get_prompt_tokens(self._ctx) def _get_bottom_toolbar(self) -> List[Tuple[Any, str]]: return PygmentsTokens(self._status_bar.get_tokens()) async def parse_and_evaluate(self, input): command_parts = split_command(input) if command_parts and command_parts[0]: cmd = command_parts[0] args = command_parts[1] if len(command_parts) > 1 else None return await self.evaluate_command(cmd, args, input) async def evaluate_command(self, cmd, args, raw): args = args or "" if cmd in self._command_registry: cmd_instance = self._command_registry.find_command(cmd) else: suggestions = find_approx( cmd, self._command_registry.get_all_commands_map() ) if self._options.auto_execute_single_suggestions and len(suggestions) == 1: print() cprint( "Auto-correcting '{}' to '{}'".format(cmd, suggestions[0]), "red", attrs=["bold"], ) cmd_instance = self._command_registry.find_command(suggestions[0]) else: print() cprint( "Unknown Command '{}'{} type `help` to see all " "available commands".format(cmd, suggestions_msg(suggestions)), "red", attrs=["bold"], ) cmd_instance = None if cmd_instance is not None: try: ret = self._blacklist.is_blacklisted(cmd) if ret: return ret except Exception as e: err_message = ( "Blacklist executing failed, " "all commands are available.\n" "{}".format(str(e)) ) cprint(err_message, "red") logging.error(err_message) try: catchall(self._usagelogger.pre_exec) result = await try_await(cmd_instance.run_interactive(cmd, args, raw)) catchall(self._usagelogger.post_exec, cmd, args, result, False) self._status_bar.set_last_command_status(result) return result except NotImplementedError as e: cprint("[NOT IMPLEMENTED]: {}".format(str(e)), "yellow", attrs=["bold"]) # not implemented error code return 99 async def run(self): prompt = self._build_cli() self._status_bar.start() try: while True: try: with patch_stdout(): text = await prompt.prompt_async( PygmentsTokens(self._get_prompt_tokens()), rprompt=PygmentsTokens( self._status_bar.get_rprompt_tokens() ), ) session_logger = self._plugin.get_session_logger(self._ctx) if session_logger: # Commands don't get written to stdout, so we have to # explicitly dump them to the session log. session_logger.log_command(text) with session_logger.patch(): await self.parse_and_evaluate(text) else: await self.parse_and_evaluate(text) except KeyboardInterrupt: pass except EOFError: # Application exiting. pass self._status_bar.stop() async def on_connected(self, *args, **kwargs): pass class ShellCompleter(Completer): def __init__(self, command_registry): super(Completer, self).__init__() self._command_registry = command_registry def get_completions(self, document, complete_event): if document.on_first_line: cmd_and_args = split_command(document.text_before_cursor) # are we the first word? suggest from command names if len(cmd_and_args) > 1: cmd, args = cmd_and_args # pass to the children # let's find the parent command first cmd_instance = self._command_registry.find_command(cmd) if not cmd_instance: return [] return cmd_instance.get_completions( cmd, Document( args, document.cursor_position - len(document.text) + len(args) ), complete_event, ) else: return self._command_registry.get_completions(document, complete_event) return []