in memory_analyzer/analysis_utils.py [0:0]
def drain_pipe(self, process):
"""
We need this because by default, `open`s on named pipes block. If GDB or
the injected GDB extension in Python crash, the process will never write
to the pipe and we will block opening and `memory_analyzer` won't exit.
"""
try:
pipe = os.open(self.fifo, os.O_RDONLY | os.O_NONBLOCK)
result = io.BytesIO()
timeout = 0.1 # seconds
partial_read = None
while bool(partial_read) or process.poll() is None:
ready_fds, _, _ = select.select([pipe], [], [], timeout)
if len(ready_fds) > 0:
ready_fd = ready_fds[0]
try:
partial_read = os.read(ready_fd, select.PIPE_BUF)
except BlockingIOError:
partial_read = None
if partial_read:
result.write(partial_read)
result.seek(0)
yield result
result.close()
except Exception as e:
frontend_utils.echo_error(f"Failed with {e}")
self._end_subprocess(process)
sys.exit(1)
finally:
os.close(pipe)