scripts/stetho_open.py (134 lines of code) (raw):

#!/usr/bin/env python3 ############################################################################### ## ## Simple utility class to create a forwarded socket connection to an ## application's stetho domain socket. ## ## Usage: ## ## sock = stetho_open( ## device='<serial-no>', ## process='com.facebook.stetho.sample') ## doHttp(sock) ## ############################################################################### import os import socket import struct import re def get_adb_server_port_from_server_socket(): socket_spec = os.environ.get('ADB_SERVER_SOCKET') if not socket_spec: return None if not socket_spec.startswith('tcp:'): raise HumanReadableError( 'Invalid or unsupported socket spec \'%s\' specified in ADB_SERVER_SOCKET.' % ( socket_spec)) return socket_spec.split(':')[-1] def get_adb_server_port(): defaultPort = 5037 portStr = get_adb_server_port_from_server_socket() or os.environ.get('ANDROID_ADB_SERVER_PORT') if portStr is None: return defaultPort elif portStr.isdigit(): return int(portStr) else: raise HumanReadableError( 'Invalid integer \'%s\' specified in ANDROID_ADB_SERVER_PORT or ADB_SERVER_SOCKET.' % ( portStr)) def stetho_open(device=None, process=None, port=None): if port is None: port = get_adb_server_port() adb = _connect_to_device(device, port) socket_name = None if process is None: socket_name = _find_only_stetho_socket(device, port) else: socket_name = _format_process_as_stetho_socket(process) try: adb.select_service('localabstract:%s' % (socket_name)) except SelectServiceError as e: raise HumanReadableError( 'Failure to target process %s: %s (is it running?)' % ( process, e.reason)) return adb.sock def read_input(sock, n, tag): data = b''; while len(data) < n: incoming_data = sock.recv(n - len(data)) if len(incoming_data) == 0: break data += incoming_data if len(data) != n: raise IOError('Unexpected end of stream while reading %s.' % tag) return data def _find_only_stetho_socket(device, port): adb = _connect_to_device(device, port) try: adb.select_service('shell:cat /proc/net/unix') last_stetho_socket_name = None process_names = [] for line in adb.sock.makefile(): row = re.split(r'\s+', line.rstrip()) if len(row) < 8: continue socket_name = row[7] if not socket_name.startswith('@stetho_'): continue # Filter out entries that are not server sockets if int(row[3], 16) != 0x10000 or int(row[5]) != 1: continue last_stetho_socket_name = socket_name[1:] process_names.append( _parse_process_from_stetho_socket(socket_name)) if len(process_names) > 1: raise HumanReadableError( 'Multiple stetho-enabled processes available:%s\n' % ( '\n\t'.join([''] + list(set(process_names)))) + 'Use -p <process> or the environment variable STETHO_PROCESS to ' + 'select one') elif last_stetho_socket_name == None: raise HumanReadableError('No stetho-enabled processes running') else: return last_stetho_socket_name finally: adb.sock.close() def _connect_to_device(device=None, port=None): if port is None: raise HumanReadableError('Must specify a port when calling _connect_to_device') adb = AdbSmartSocketClient() adb.connect(port) try: if device is None: adb.select_service('host:transport-any') else: adb.select_service('host:transport:%s' % (device)) return adb except SelectServiceError as e: raise HumanReadableError( 'Failure to target device %s: %s' % (device, e.reason)) def _parse_process_from_stetho_socket(socket_name): m = re.match("^\@stetho_(.+)_devtools_remote$", socket_name) if m is None: raise Exception('Unexpected Stetho socket formatting: %s' % (socket_name)) return m.group(1) def _format_process_as_stetho_socket(process): return 'stetho_%s_devtools_remote' % (process) class AdbSmartSocketClient(object): """Implements the smartsockets system defined by: https://android.googlesource.com/platform/system/core/+/master/adb/protocol.txt """ def __init__(self): pass def connect(self, port=5037): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: sock.connect(('localhost', port)) except ConnectionRefusedError: sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) sock.connect(('localhost', port)) self.sock = sock def select_service(self, service): message = '%04x%s' % (len(service), service) self.sock.send(message.encode('ascii')) status = read_input(self.sock, 4, "status") if status == b'OKAY': # All good... pass elif status == b'FAIL': reason_len = int(read_input(self.sock, 4, "fail reason"), 16) reason = read_input(self.sock, reason_len, "fail reason lean").decode('ascii') raise SelectServiceError(reason) else: raise Exception('Unrecognized status=%s' % (status)) class SelectServiceError(Exception): def __init__(self, reason): self.reason = reason def __str__(self): return repr(self.reason) class HumanReadableError(Exception): def __init__(self, reason): self.reason = reason def __str__(self): return self.reason