client/commands/pysa_server.py (276 lines of code) (raw):

# Copyright (c) Meta Platforms, Inc. and affiliates. # # This source code is licensed under the MIT license found in the # LICENSE file in the root directory of this source tree. """ This is an implementation of Pysa's language server. It is a refactored version of persistent.py. """ import asyncio import json import logging from typing import Union from .. import ( json_rpc, log, command_arguments, configuration as configuration_module, ) from . import ( commands, language_server_protocol as lsp, async_server_connection as connection, start, ) from .persistent import ( LSPEvent, _read_lsp_request, _log_lsp_event, InitializationSuccess, InitializationFailure, InitializationExit, _wait_for_exit, process_initialize_request, ) LOG: logging.Logger = logging.getLogger(__name__) async def try_initialize( input_channel: connection.TextReader, output_channel: connection.TextWriter, ) -> Union[InitializationSuccess, InitializationFailure, InitializationExit]: """ Read an LSP message from the input channel and try to initialize an LSP server. Also write to the output channel with proper response if the input message is a request instead of a notification. The function can return one of three possibilities: - If the initialization succeeds, return `InitializationSuccess`. - If the initialization fails, return `InitializationFailure`. There could be many reasons for the failure: The incoming LSP message may not be an initiailization request. The incoming LSP request may be malformed. Or the client may not complete the handshake by sending back an `initialized` request. - If an exit notification is received, return `InitializationExit`. The LSP spec allows exiting a server without a preceding initialize request. """ request = None try: request = await lsp.read_json_rpc(input_channel) LOG.debug(f"Received pre-initialization LSP request: {request}") request_id = request.id if request_id is None: return ( InitializationExit() if request.method == "exit" else InitializationFailure() ) if request.method != "initialize": raise lsp.ServerNotInitializedError("An initialize request is needed.") request_parameters = request.parameters if request_parameters is None: raise lsp.ServerNotInitializedError( "Missing parameters for initialize request." ) initialize_parameters = lsp.InitializeParameters.from_json_rpc_parameters( request_parameters ) result = process_initialize_request(initialize_parameters) await lsp.write_json_rpc( output_channel, # pyre-fixme[16]: Pyre doesn't understand `dataclasses_json` json_rpc.SuccessResponse(id=request_id, result=result.to_dict()), ) initialized_notification = await lsp.read_json_rpc(input_channel) if initialized_notification.method == "shutdown": await _wait_for_exit(input_channel, output_channel) return InitializationExit() elif initialized_notification.method != "initialized": actual_message = json.dumps(initialized_notification.json()) raise lsp.ServerNotInitializedError( "Failed to receive an `initialized` request from client. " + f"Got {log.truncate(actual_message, 100)}" ) return InitializationSuccess( client_capabilities=initialize_parameters.capabilities, client_info=initialize_parameters.client_info, initialization_options=initialize_parameters.initialization_options, ) except json_rpc.JSONRPCException as json_rpc_error: await lsp.write_json_rpc( output_channel, json_rpc.ErrorResponse( id=request.id if request is not None else None, code=json_rpc_error.error_code(), message=str(json_rpc_error), data={"retry": False}, ), ) return InitializationFailure(exception=json_rpc_error) class PysaServer: # I/O Channels input_channel: connection.TextReader output_channel: connection.TextWriter # Immutable States client_capabilities: lsp.ClientCapabilities def __init__( self, input_channel: connection.TextReader, output_channel: connection.TextWriter, client_capabilities: lsp.ClientCapabilities, pyre_arguments: start.Arguments, binary_location: str, server_identifier: str, ) -> None: self.input_channel = input_channel self.output_channel = output_channel self.client_capabilities = client_capabilities self.pyre_arguments = pyre_arguments self.binary_location = binary_location self.server_identifier = server_identifier async def show_message_to_client( self, message: str, level: lsp.MessageType = lsp.MessageType.INFO ) -> None: await lsp.write_json_rpc( self.output_channel, json_rpc.Request( method="window/showMessage", parameters=json_rpc.ByNameParameters( {"type": int(level), "message": message} ), ), ) async def log_and_show_message_to_client( self, message: str, level: lsp.MessageType = lsp.MessageType.INFO ) -> None: if level == lsp.MessageType.ERROR: LOG.error(message) elif level == lsp.MessageType.WARNING: LOG.warning(message) elif level == lsp.MessageType.INFO: LOG.info(message) else: LOG.debug(message) await self.show_message_to_client(message, level) async def wait_for_exit(self) -> int: while True: async with _read_lsp_request( self.input_channel, self.output_channel ) as request: LOG.debug(f"Received post-shutdown request: {request}") if request.method == "exit": return 0 else: raise json_rpc.InvalidRequestError("LSP server has been shut down") async def process_open_request( self, parameters: lsp.DidOpenTextDocumentParameters ) -> None: document_path = parameters.text_document.document_uri().to_file_path() if document_path is None: raise json_rpc.InvalidRequestError( f"Document URI is not a file: {parameters.text_document.uri}" ) async def process_close_request( self, parameters: lsp.DidCloseTextDocumentParameters ) -> None: document_path = parameters.text_document.document_uri().to_file_path() if document_path is None: raise json_rpc.InvalidRequestError( f"Document URI is not a file: {parameters.text_document.uri}" ) try: LOG.info(f"File closed: {document_path}") except KeyError: LOG.warning(f"Trying to close an un-opened file: {document_path}") async def process_did_save_request( self, parameters: lsp.DidSaveTextDocumentParameters ) -> None: document_path = parameters.text_document.document_uri().to_file_path() if document_path is None: raise json_rpc.InvalidRequestError( f"Document URI is not a file: {parameters.text_document.uri}" ) async def run(self) -> int: while True: async with _read_lsp_request( self.input_channel, self.output_channel ) as request: if request.method == "exit": return commands.ExitCode.FAILURE elif request.method == "shutdown": lsp.write_json_rpc( self.output_channel, json_rpc.SuccessResponse(id=request.id, result=None), ) return await self.wait_for_exit() elif request.method == "textDocument/didOpen": parameters = request.parameters if parameters is None: raise json_rpc.InvalidRequestError( "Missing parameters for didOpen method" ) await self.process_open_request( lsp.DidOpenTextDocumentParameters.from_json_rpc_parameters( parameters ) ) elif request.method == "textDocument/didClose": parameters = request.parameters if parameters is None: raise json_rpc.InvalidRequestError( "Missing parameters for didClose method" ) await self.process_close_request( lsp.DidCloseTextDocumentParameters.from_json_rpc_parameters( parameters ) ) elif request.method == "textDocument/didSave": parameters = request.parameters if parameters is None: raise json_rpc.InvalidRequestError( "Missing parameters for didSave method" ) await self.process_did_save_request( lsp.DidSaveTextDocumentParameters.from_json_rpc_parameters( parameters ) ) elif request.id is not None: raise lsp.RequestCancelledError("Request not supported yet") async def run_persistent( binary_location: str, server_identifier: str, pysa_arguments: start.Arguments ) -> int: stdin, stdout = await connection.create_async_stdin_stdout() while True: initialize_result = await try_initialize(stdin, stdout) if isinstance(initialize_result, InitializationExit): LOG.info("Received exit request before initialization.") return 0 elif isinstance(initialize_result, InitializationSuccess): LOG.info("Initialization successful.") client_info = initialize_result.client_info _log_lsp_event( remote_logging=pysa_arguments.base_arguments.remote_logging, event=LSPEvent.INITIALIZED, normals=( {} if client_info is None else { "lsp client name": client_info.name, "lsp client version": client_info.version, } ), ) client_capabilities = initialize_result.client_capabilities LOG.debug(f"Client capabilities: {client_capabilities}") server = PysaServer( input_channel=stdin, output_channel=stdout, client_capabilities=client_capabilities, binary_location=binary_location, server_identifier=server_identifier, pyre_arguments=pysa_arguments, ) return await server.run() elif isinstance(initialize_result, InitializationFailure): exception = initialize_result.exception message = ( str(exception) if exception is not None else "ignoring notification" ) LOG.info(f"Initialization failed: {message}") # Loop until we get either InitializeExit or InitializeSuccess else: raise RuntimeError("Cannot determine the type of initialize_result") def run( configuration: configuration_module.Configuration, start_arguments: command_arguments.StartArguments, ) -> int: binary_location = configuration.get_binary_respecting_override() if binary_location is None: raise configuration_module.InvalidConfiguration( "Cannot locate a Pyre binary to run." ) server_identifier = start.get_server_identifier(configuration) pyre_arguments = start.create_server_arguments(configuration, start_arguments) if pyre_arguments.watchman_root is None: raise commands.ClientException( ( "Cannot locate a `watchman` root. Pyre's server will not function " + "properly." ) ) return asyncio.get_event_loop().run_until_complete( run_persistent(binary_location, server_identifier, pyre_arguments) )