client/commands/language_server_protocol.py (450 lines of code) (raw):

# Copyright (c) Meta Platforms, Inc. and affiliates. # # This source code is licensed under the MIT license found in the # LICENSE file in the root directory of this source tree. import asyncio import dataclasses import enum import logging import urllib from pathlib import Path from typing import ( Iterable, Optional, List, Type, TypeVar, ) import dataclasses_json from .. import json_rpc from . import async_server_connection LOG: logging.Logger = logging.getLogger(__name__) T = TypeVar("T") Point = TypeVar("Point") Value = TypeVar("Value") class ServerNotInitializedError(json_rpc.JSONRPCException): def error_code(self) -> int: return -32002 class RequestCancelledError(json_rpc.JSONRPCException): def error_code(self) -> int: return -32800 async def _read_headers(input_channel: async_server_connection.TextReader) -> List[str]: headers = [] header = await input_channel.read_until("\r\n") while header != "\r\n": headers.append(header) header = await input_channel.read_until("\r\n") return headers def _get_content_length(headers: Iterable[str]) -> int: try: for header in headers: parts = [part.strip().lower() for part in header.split(":", maxsplit=1)] if len(parts) <= 1: continue if parts[0] == "content-length": return int(parts[1]) # pyre-fixme[61]: `parts` may not be initialized here. raise json_rpc.ParseError(f"Failed to find content length header from {parts}") except ValueError as error: raise json_rpc.ParseError(f"Cannot parse content length into integer: {error}") async def read_json_rpc( input_channel: async_server_connection.TextReader, ) -> json_rpc.Request: """ Asynchronously read a JSON-RPC request from the given input channel. May raise `json_rpc.ParseError`, `json_rpc.InvalidRequestError` and `json_prc.InvalidParameterError`. """ try: headers = await _read_headers(input_channel) content_length = _get_content_length(headers) payload = await input_channel.read_exactly(content_length) return json_rpc.Request.from_string(payload) except asyncio.IncompleteReadError as error: raise json_rpc.ParseError(str(error)) from error async def write_json_rpc( output_channel: async_server_connection.TextWriter, response: json_rpc.JSONRPC ) -> None: """ Asynchronously write a JSON-RPC response to the given output channel. """ payload = response.serialize() await output_channel.write(f"Content-Length: {len(payload)}\r\n\r\n{payload}") def _parse_parameters(parameters: json_rpc.Parameters, target: Type[T]) -> T: """ Parse the given JSON-RPC parameters into specified LSP parameters. Raise `json_rpc.InvalidRequestError`on parsing failure. """ if not isinstance(parameters, json_rpc.ByNameParameters): raise json_rpc.InvalidRequestError( "Parameters for LSP requests must be passed by name" ) try: # pyre-fixme[16]: Pyre doesn't understand `dataclasses_json` return target.schema().load(parameters.values) except (KeyError, ValueError, dataclasses_json.mm.ValidationError) as error: raise json_rpc.InvalidRequestError(str(error)) from error class SerializationSafeIntEnum(enum.IntEnum): def __repr(self) -> str: return str(self.value) class DiagnosticTag(SerializationSafeIntEnum): UNNECESSARY = 1 DEPRECATED = 2 class DiagnosticSeverity(SerializationSafeIntEnum): ERROR = 1 WARNING = 2 INFORMATION = 3 HINT = 4 class TextDocumentSyncKind(SerializationSafeIntEnum): NONE = 0 FULL = 1 INCREMENTAL = 2 class MessageType(SerializationSafeIntEnum): ERROR = 1 WARNING = 2 INFO = 3 LOG = 4 @dataclasses.dataclass(frozen=True) class DocumentUri: scheme: str authority: str path: str query: str fragment: str def to_file_path(self) -> Optional[Path]: if self.scheme == "file": return Path(self.path) return None def unparse(self) -> str: return urllib.parse.urlunparse( ( urllib.parse.quote(self.scheme), urllib.parse.quote(self.authority), urllib.parse.quote(self.path), "", urllib.parse.quote(self.query), urllib.parse.quote(self.fragment), ) ) @staticmethod def parse(uri: str) -> "DocumentUri": parsed_uri = urllib.parse.urlparse(uri) return DocumentUri( scheme=urllib.parse.unquote(parsed_uri.scheme), authority=urllib.parse.unquote(parsed_uri.netloc), path=urllib.parse.unquote(parsed_uri.path), query=urllib.parse.unquote(parsed_uri.query), fragment=urllib.parse.unquote(parsed_uri.fragment), ) @staticmethod def from_file_path(file_path: Path) -> "DocumentUri": return DocumentUri( scheme="file", authority="", path=str(file_path), query="", fragment="" ) @dataclasses_json.dataclass_json( letter_case=dataclasses_json.LetterCase.CAMEL, undefined=dataclasses_json.Undefined.EXCLUDE, ) @dataclasses.dataclass(frozen=True, order=True) class Position: line: int character: int def to_lsp_position(self) -> "LspPosition": return LspPosition(self.line - 1, self.character) @dataclasses_json.dataclass_json( letter_case=dataclasses_json.LetterCase.CAMEL, undefined=dataclasses_json.Undefined.EXCLUDE, ) @dataclasses.dataclass(frozen=True) class LspPosition: """LSP uses 0-indexing for lines whereas Pyre uses 1-indexing.""" line: int character: int def to_pyre_position(self) -> "Position": return Position(self.line + 1, self.character) @dataclasses_json.dataclass_json( letter_case=dataclasses_json.LetterCase.CAMEL, undefined=dataclasses_json.Undefined.EXCLUDE, ) @dataclasses.dataclass(frozen=True) class Range: start: Position end: Position def to_lsp_range(self) -> "LspRange": return LspRange( start=self.start.to_lsp_position(), end=self.end.to_lsp_position(), ) @dataclasses_json.dataclass_json( letter_case=dataclasses_json.LetterCase.CAMEL, undefined=dataclasses_json.Undefined.EXCLUDE, ) @dataclasses.dataclass(frozen=True) class LspRange: start: LspPosition end: LspPosition @dataclasses_json.dataclass_json( letter_case=dataclasses_json.LetterCase.CAMEL, undefined=dataclasses_json.Undefined.EXCLUDE, ) @dataclasses.dataclass(frozen=True) class Diagnostic: range: Range message: str severity: Optional[DiagnosticSeverity] = None code: Optional[int] = None source: Optional[str] = None @dataclasses_json.dataclass_json( letter_case=dataclasses_json.LetterCase.CAMEL, undefined=dataclasses_json.Undefined.EXCLUDE, ) @dataclasses.dataclass(frozen=True) class Info: name: str version: Optional[str] = None @dataclasses_json.dataclass_json( letter_case=dataclasses_json.LetterCase.CAMEL, undefined=dataclasses_json.Undefined.EXCLUDE, ) @dataclasses.dataclass(frozen=True) class TextDocumentSyncClientCapabilities: did_save: bool = False @dataclasses_json.dataclass_json( letter_case=dataclasses_json.LetterCase.CAMEL, undefined=dataclasses_json.Undefined.EXCLUDE, ) @dataclasses.dataclass(frozen=True) class PublishDiagnosticsClientTagSupport: value_set: List[DiagnosticTag] = dataclasses.field(default_factory=list) @dataclasses_json.dataclass_json( letter_case=dataclasses_json.LetterCase.CAMEL, undefined=dataclasses_json.Undefined.EXCLUDE, ) @dataclasses.dataclass(frozen=True) class PublishDiagnosticsClientCapabilities: related_information: bool = False tag_support: Optional[PublishDiagnosticsClientTagSupport] = None version_support: bool = False @dataclasses_json.dataclass_json( letter_case=dataclasses_json.LetterCase.CAMEL, undefined=dataclasses_json.Undefined.EXCLUDE, ) @dataclasses.dataclass(frozen=True) class TextDocumentClientCapabilities: synchronization: Optional[TextDocumentSyncClientCapabilities] = None publish_diagnostics: Optional[PublishDiagnosticsClientCapabilities] = None @dataclasses_json.dataclass_json( letter_case=dataclasses_json.LetterCase.CAMEL, undefined=dataclasses_json.Undefined.EXCLUDE, ) @dataclasses.dataclass(frozen=True) class ShowStatusRequestClientCapabilities: pass @dataclasses_json.dataclass_json( letter_case=dataclasses_json.LetterCase.CAMEL, undefined=dataclasses_json.Undefined.EXCLUDE, ) @dataclasses.dataclass(frozen=True) class WindowClientCapabilities: work_done_progress: Optional[bool] = None # Custom VSCode extension for status bar status: Optional[ShowStatusRequestClientCapabilities] = None @dataclasses_json.dataclass_json( letter_case=dataclasses_json.LetterCase.CAMEL, undefined=dataclasses_json.Undefined.EXCLUDE, ) @dataclasses.dataclass(frozen=True) class ClientCapabilities: text_document: Optional[TextDocumentClientCapabilities] = None window: Optional[WindowClientCapabilities] = None @dataclasses_json.dataclass_json( letter_case=dataclasses_json.LetterCase.CAMEL, undefined=dataclasses_json.Undefined.EXCLUDE, ) @dataclasses.dataclass(frozen=True) class SaveOptions: include_text: Optional[bool] = None @dataclasses_json.dataclass_json( letter_case=dataclasses_json.LetterCase.CAMEL, undefined=dataclasses_json.Undefined.EXCLUDE, ) @dataclasses.dataclass(frozen=True) class TextDocumentSyncOptions: open_close: bool = False change: TextDocumentSyncKind = TextDocumentSyncKind.NONE save: Optional[SaveOptions] = None @dataclasses_json.dataclass_json( letter_case=dataclasses_json.LetterCase.CAMEL, undefined=dataclasses_json.Undefined.EXCLUDE, ) @dataclasses.dataclass(frozen=True) class ServerCapabilities: text_document_sync: Optional[TextDocumentSyncOptions] = None hover_provider: Optional[bool] = None definition_provider: Optional[bool] = None @dataclasses_json.dataclass_json( letter_case=dataclasses_json.LetterCase.CAMEL, undefined=dataclasses_json.Undefined.EXCLUDE, ) @dataclasses.dataclass(frozen=True) class InitializationOptions: notebook_number: Optional[int] = None @dataclasses_json.dataclass_json( letter_case=dataclasses_json.LetterCase.CAMEL, undefined=dataclasses_json.Undefined.EXCLUDE, ) @dataclasses.dataclass(frozen=True) class InitializeParameters: capabilities: ClientCapabilities process_id: Optional[int] = None client_info: Optional[Info] = None initialization_options: Optional[InitializationOptions] = None @staticmethod def from_json_rpc_parameters( parameters: json_rpc.Parameters, ) -> "InitializeParameters": return _parse_parameters(parameters, target=InitializeParameters) @dataclasses_json.dataclass_json( letter_case=dataclasses_json.LetterCase.CAMEL, undefined=dataclasses_json.Undefined.EXCLUDE, ) @dataclasses.dataclass(frozen=True) class InitializeResult: capabilities: ServerCapabilities server_info: Optional[Info] = None @dataclasses_json.dataclass_json( letter_case=dataclasses_json.LetterCase.CAMEL, undefined=dataclasses_json.Undefined.EXCLUDE, ) @dataclasses.dataclass(frozen=True) class TextDocumentIdentifier: uri: str def document_uri(self) -> DocumentUri: return DocumentUri.parse(self.uri) @dataclasses_json.dataclass_json( letter_case=dataclasses_json.LetterCase.CAMEL, undefined=dataclasses_json.Undefined.EXCLUDE, ) @dataclasses.dataclass(frozen=True) class TextDocumentItem: uri: str language_id: str version: int text: str def document_uri(self) -> DocumentUri: return DocumentUri.parse(self.uri) @dataclasses_json.dataclass_json( letter_case=dataclasses_json.LetterCase.CAMEL, undefined=dataclasses_json.Undefined.EXCLUDE, ) @dataclasses.dataclass(frozen=True) class DidOpenTextDocumentParameters: text_document: TextDocumentItem @staticmethod def from_json_rpc_parameters( parameters: json_rpc.Parameters, ) -> "DidOpenTextDocumentParameters": return _parse_parameters(parameters, target=DidOpenTextDocumentParameters) @dataclasses_json.dataclass_json( letter_case=dataclasses_json.LetterCase.CAMEL, undefined=dataclasses_json.Undefined.EXCLUDE, ) @dataclasses.dataclass(frozen=True) class DidCloseTextDocumentParameters: text_document: TextDocumentIdentifier @staticmethod def from_json_rpc_parameters( parameters: json_rpc.Parameters, ) -> "DidCloseTextDocumentParameters": return _parse_parameters(parameters, target=DidCloseTextDocumentParameters) @dataclasses_json.dataclass_json( letter_case=dataclasses_json.LetterCase.CAMEL, undefined=dataclasses_json.Undefined.EXCLUDE, ) @dataclasses.dataclass(frozen=True) class DidSaveTextDocumentParameters: text_document: TextDocumentIdentifier text: Optional[str] = None @staticmethod def from_json_rpc_parameters( parameters: json_rpc.Parameters, ) -> "DidSaveTextDocumentParameters": return _parse_parameters(parameters, target=DidSaveTextDocumentParameters) @dataclasses_json.dataclass_json( letter_case=dataclasses_json.LetterCase.CAMEL, undefined=dataclasses_json.Undefined.EXCLUDE, ) @dataclasses.dataclass(frozen=True) class WorkspaceConfiguration: kernel_runtime_dir: List[str] @dataclasses_json.dataclass_json( letter_case=dataclasses_json.LetterCase.CAMEL, undefined=dataclasses_json.Undefined.EXCLUDE, ) @dataclasses.dataclass(frozen=True) class WorkspaceDidChangeConfigurationParameters: settings: WorkspaceConfiguration @staticmethod def from_json_rpc_parameters( parameters: json_rpc.Parameters, ) -> "WorkspaceDidChangeConfigurationParameters": return _parse_parameters( parameters, target=WorkspaceDidChangeConfigurationParameters ) @dataclasses_json.dataclass_json( letter_case=dataclasses_json.LetterCase.CAMEL, undefined=dataclasses_json.Undefined.EXCLUDE, ) @dataclasses.dataclass(frozen=True) class HoverTextDocumentParameters: text_document: TextDocumentIdentifier position: LspPosition @staticmethod def from_json_rpc_parameters( parameters: json_rpc.Parameters, ) -> "HoverTextDocumentParameters": return _parse_parameters(parameters, target=HoverTextDocumentParameters) @dataclasses_json.dataclass_json( letter_case=dataclasses_json.LetterCase.CAMEL, undefined=dataclasses_json.Undefined.EXCLUDE, ) @dataclasses.dataclass(frozen=True) class HoverResponse: """Contains the Markdown response to be shown in the hover card.""" contents: str @staticmethod def empty() -> "HoverResponse": return HoverResponse(contents="") @dataclasses_json.dataclass_json( letter_case=dataclasses_json.LetterCase.CAMEL, undefined=dataclasses_json.Undefined.EXCLUDE, ) @dataclasses.dataclass(frozen=True) class TypeCoverageTextDocumentParameters: text_document: TextDocumentIdentifier @staticmethod def from_json_rpc_parameters( parameters: json_rpc.Parameters, ) -> "TypeCoverageTextDocumentParameters": return _parse_parameters(parameters, target=TypeCoverageTextDocumentParameters) @dataclasses_json.dataclass_json( letter_case=dataclasses_json.LetterCase.CAMEL, undefined=dataclasses_json.Undefined.EXCLUDE, ) @dataclasses.dataclass(frozen=True) class TypeCoverageResult: """Result for nuclide-vscode-lsp coverage feature.""" covered_percent: float uncovered_ranges: List[Diagnostic] default_message: str @dataclasses_json.dataclass_json( letter_case=dataclasses_json.LetterCase.CAMEL, undefined=dataclasses_json.Undefined.EXCLUDE, ) @dataclasses.dataclass(frozen=True) class DefinitionTextDocumentParameters: text_document: TextDocumentIdentifier position: LspPosition @staticmethod def from_json_rpc_parameters( parameters: json_rpc.Parameters, ) -> "DefinitionTextDocumentParameters": return _parse_parameters(parameters, target=DefinitionTextDocumentParameters) @dataclasses_json.dataclass_json( letter_case=dataclasses_json.LetterCase.CAMEL, undefined=dataclasses_json.Undefined.EXCLUDE, ) @dataclasses.dataclass(frozen=True) class PyreDefinitionResponse: """Contains one possible definition for a symbol.""" path: str range: Range def to_lsp_definition_response( self, ) -> "LspDefinitionResponse": return LspDefinitionResponse(uri=self.path, range=self.range.to_lsp_range()) @dataclasses_json.dataclass_json( letter_case=dataclasses_json.LetterCase.CAMEL, undefined=dataclasses_json.Undefined.EXCLUDE, ) @dataclasses.dataclass(frozen=True) class LspDefinitionResponse: """Contains one possible definition for a symbol.""" uri: str range: LspRange