idb/grpc/dap.py (101 lines of code) (raw):

#!/usr/bin/env python3 # Copyright (c) Meta Platforms, Inc. and affiliates. # # This source code is licensed under the MIT license found in the # LICENSE file in the root directory of this source tree. import asyncio import logging from asyncio import StreamWriter, StreamReader from typing import Optional, AsyncGenerator from idb.common.types import IdbException from idb.grpc.idb_grpc import CompanionServiceStub from idb.grpc.idb_pb2 import ( DapResponse, DapRequest, ) from idb.grpc.stream import Stream from idb.utils.contextlib import asynccontextmanager from idb.utils.typing import none_throws class RemoteDapServer: """ Manage the connection to the remote dap server spawn by the companion """ def __init__( self, stream: Stream[DapRequest, DapResponse], logger: logging.Logger, ) -> None: self._stream = stream self.logger = logger @staticmethod @asynccontextmanager async def start( stub: CompanionServiceStub, logger: logging.Logger, pkg_id: str ) -> AsyncGenerator["RemoteDapServer", None]: """ Created a RemoteDapServer starting a new grpc stream and sending a start dap server request to companion """ logger.info("Starting dap connection") async with stub.dap.open() as stream: await stream.send_message( DapRequest(start=DapRequest.Start(debugger_pkg_id=pkg_id)) ) response = await stream.recv_message() logger.debug(f"Dap response after start request: {response}") if response and response.started: logger.info("Dap stream ready to receive messages") dap_server = RemoteDapServer( stream=stream, logger=logger, ) try: yield dap_server finally: await dap_server.__stop() else: logger.error(f"Starting dap server failed! {response}") raise IdbException("Failed to spawn dap server.") logger.info("Dap grpc stream is closed.") async def pipe( self, input_stream: StreamReader, output_stream: StreamWriter, stop: asyncio.Event, ) -> None: """ Pipe stdin and stdout to remote dap server """ read_future: Optional[asyncio.Future[StreamReader]] = None write_future: Optional[asyncio.Future[StreamWriter]] = None stop_future = asyncio.ensure_future(stop.wait()) while True: if read_future is None: read_future = asyncio.ensure_future(self._stream.recv_message()) if write_future is None: write_future = asyncio.ensure_future( read_next_dap_protocol_message(input_stream) ) done, pending = await asyncio.wait( [read_future, write_future, stop_future], return_when=asyncio.FIRST_COMPLETED, ) if stop_future in done: self.logger.debug("Received stop command! Closing stream...") read_future.cancel() self.logger.debug("Read future cancelled!") write_future.cancel() self.logger.debug("Write future cancelled!") break if write_future in done: data = none_throws(write_future.result()) write_future = None await self._stream.send_message( DapRequest(pipe=DapRequest.Pipe(data=data)) ) if read_future in done: self.logger.debug("Received a message from companion.") result = none_throws(read_future.result()) read_future = None if result is None: # Reached the end of the stream break output_stream.write(result.stdout.data) async def __stop(self) -> None: """ Stop remote dap server and end grpc stream """ self.logger.debug("Sending stop dap request to close the stream.") await self._stream.send_message(DapRequest(stop=DapRequest.Stop())) await self._stream.end() response = await self._stream.recv_message() if response and not response.stopped: self.logger.error(f"Dap server failed to stop: {response}") else: self.logger.info(f"Dap server successfully stopped: {response}") async def read_next_dap_protocol_message(stream: StreamReader) -> bytes: content_length = await stream.readuntil(b"\r\n\r\n") length = content_length.decode("utf-8").split(" ")[1] body = await stream.read(int(length)) return content_length + body