tools/photongdb.py (193 lines of code) (raw):

#!/usr/bin/python # -*- coding: utf-8 -*- import gdb class bcolors: HEADER = '\033[95m' OKBLUE = '\033[94m' OKCYAN = '\033[96m' OKGREEN = '\033[92m' WARNING = '\033[93m' FAIL = '\033[91m' ENDC = '\033[0m' BOLD = '\033[1m' UNDERLINE = '\033[4m' CMAP = { 'CURRENT': bcolors.OKGREEN, 'READY': bcolors.OKCYAN, 'SLEEP': bcolors.OKBLUE, 'WARNING': bcolors.HEADER, 'SWITCH': bcolors.BOLD, 'INFO': bcolors.OKGREEN, } enabling = False photon = [] def get_arch(): frame = gdb.selected_frame() arch = frame.architecture() return arch.name() def get_regs(arch): regs = {} if arch == 'aarch64': regs['sp'] = '$sp' regs['bp'] = '$x29' regs['ip'] = '$pc' else: regs['sp'] = '$rsp' regs['bp'] = '$rbp' regs['ip'] = '$rip' return regs def cprint(stat, *args): print('{}{}{} {}'.format(CMAP[stat], stat, bcolors.ENDC, ' '.join(str(x) for x in args))) def get_next_ready(p): return gdb.parse_and_eval("(void*)gdb_get_next_thread((void*){})".format(p)) def get_current(): return gdb.parse_and_eval("(void*)gdb_get_current_thread()") def get_vcpu(p): return gdb.parse_and_eval("(void*)gdb_get_vcpu((void*){})".format(p)) def get_thread_stack_ptr(p): return gdb.parse_and_eval("(void*)gdb_get_thread_stack_ptr((void*){})".format(p)) def in_sleep(q): size = int(gdb.parse_and_eval("(size_t)gdb_get_sleepq_size((void*){})".format(q))) return [gdb.parse_and_eval("(void*)gdb_get_sleepq_item((void*){}, {})".format(q, i)) for i in range(size)] def switch_to_ph(regs, rsp, rbp, rip): gdb.parse_and_eval("{}={}".format(regs['sp'], rsp)) gdb.parse_and_eval("{}={}".format(regs['bp'], rbp)) gdb.parse_and_eval("{}={}".format(regs['ip'], rip)) def get_u64_ptr(p): return int(gdb.parse_and_eval("(uint64_t)({})".format(p))) def get_u64_val(p): return int(gdb.parse_and_eval("*(uint64_t*)({})".format(p))) def get_u64_reg(p): return get_u64_ptr(p) def set_u64_reg(l, r): return gdb.parse_and_eval("{} = (uint64_t)({})".format(l, r)) def get_stkregs(p): t = get_u64_ptr(get_thread_stack_ptr(p)) rsp = t + 8 rip = get_u64_val(t + 8) rbp = get_u64_val(t) return rsp, rbp, rip def load_photon_threads(): global photon if enabling: return photon = [] c = get_current() if c == gdb.parse_and_eval("0"): return photon.append(('CURRENT', c, get_u64_reg('$saved_rsp'), get_u64_reg('$saved_rbp'), get_u64_reg('$saved_rip'))) p = get_next_ready(c) while p != c: rsp, rbp, rip = get_stkregs(p) photon.append(('READY', p, rsp, rbp, rip)) p = get_next_ready(p) vcpu = get_vcpu(c) for t in in_sleep(vcpu): rsp, rbp, rip = get_stkregs(t) photon.append(('SLEEP', t, rsp, rbp, rip)) return class PhotonThreads(gdb.Command): def __init__(self): gdb.Command.__init__(self, "photon_current", gdb.COMMAND_STACK, gdb.COMPLETE_NONE) def invoke(self, arg, tty): photon_init() cprint("CURRENT", get_current().dereference()) class PhotonLs(gdb.Command): def __init__(self): gdb.Command.__init__( self, "photon_ls", gdb.COMMAND_STACK, gdb.COMPLETE_NONE) def invoke(self, arg, tty): photon_init() for i, (stat, pth, rsp, rbp, rbi) in enumerate(photon): cprint( stat, '[{}]'.format(i), pth, hex(rsp), hex(rbp), hex(rbi)) class PhotonFr(gdb.Command): def __init__(self): gdb.Command.__init__( self, "photon_fr", gdb.COMMAND_STACK, gdb.COMPLETE_NONE) def invoke(self, arg, tty): if not enabling: print("Photon debugger not init") return i = int(arg) if i < 0 or i > len(photon): print("No such photon thread") return arch = get_arch() regs = get_regs(arch) cprint('SWITCH', "to {} {} {}".format(hex(photon[i][2]), hex(photon[i][3]), hex(photon[i][4]))) switch_to_ph(regs, photon[i][2], photon[i][3], photon[i][4]) def photon_init(): global photon arch = get_arch() regs = get_regs(arch) set_u64_reg('$saved_rsp', regs['sp']) set_u64_reg('$saved_rbp', regs['bp']) set_u64_reg('$saved_rip', regs['ip']) load_photon_threads() if len(photon) == 0: return class PhotonInit(gdb.Command): def __init__(self): gdb.Command.__init__(self, "photon_init", gdb.COMMAND_STACK, gdb.COMPLETE_NONE) def invoke(self, arg, tty): global enabling photon_init() enabling = True cprint('WARNING', "Entered photon thread lookup mode. PLEASE do not trying step-in or continue before `photon_fini`") def photon_restore(): if not enabling: return arch = get_arch() regs = get_regs(arch) set_u64_reg(regs['sp'], '$saved_rsp') set_u64_reg(regs['bp'], '$saved_rbp') set_u64_reg(regs['ip'], '$saved_rip') class PhotonRestore(gdb.Command): def __init__(self): gdb.Command.__init__(self, "photon_rst", gdb.COMMAND_STACK, gdb.COMPLETE_NONE) def invoke(self, arg, tty): photon_restore() class PhotonFini(gdb.Command): def __init__(self): gdb.Command.__init__(self, "photon_fini", gdb.COMMAND_STACK, gdb.COMPLETE_NONE) def invoke(self, arg, tty): global photon global enabling if not enabling: return photon_restore() photon = [] enabling = False cprint('WARNING', "Finished photon thread lookup mode.") from threading import Lock class PhotonPs(gdb.Command): def __init__(self): gdb.Command.__init__(self, "photon_ps", gdb.COMMAND_STACK, gdb.COMPLETE_NONE) self.lock = Lock() def invoke(self, arg, tty): with self.lock: photon_init() if len(photon) > 0: for i, (stat, pth, rsp, rbp, rbi) in enumerate(photon): cprint( stat, '[{}]'.format(i), pth, hex(rsp), hex(rbp), hex(rbi)) arch = get_arch() regs = get_regs(arch) switch_to_ph(regs, rsp, rbp, rbi) gdb.execute("bt") switch_to_ph(regs, photon[0][2], photon[0][3], photon[0][4]) photon_restore() PhotonInit() PhotonFini() PhotonRestore() PhotonThreads() PhotonLs() PhotonFr() PhotonPs() cprint('INFO', 'Photon-GDB-extension loaded')