functions/functraceplus.py (249 lines of code) (raw):

import idaapi import idc import ida_ida import ida_dbg import logging import ida_nalt import os from kernel32 import Kernel32 from advapi32 import Advapi32 # Calling conventions CALL_CONV_CDECL = 0 CALL_CONV_STDCALL = 1 CALL_CONV_FASTCALL = 2 CALL_CONV_FLOAT = 3 VAR_ARGS = -1 functions = { "kernel32" : Kernel32(), "advapi32" : Advapi32() } # logging setting logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') class FuncTracePlus(ida_dbg.DBG_Hooks): def __init__(self, ignore_bp = True): ida_dbg.DBG_Hooks.__init__(self) self.ignore_bp = ignore_bp self.is_64bit = True if idaapi.BADADDR == 0xffffffffffffffff else False self.module_funcs = {} self.module_address_name = {} self.entry_points = [] self.to_hook_func = {} self.break_points = {} self.break_points_ret_addr = {} self.getproc_func_ea = None self.getproc_func_addr = None self.getproc_func_ret_addrs = {} def dbg_process_start(self, pid, tid, ea, modinfo_name, modinfo_base, modinfo_size): """Handle process start event.""" ep = ida_ida.inf_get_start_ip() self.entry_points.append(ep) idc.add_bpt(ep) logging.info(f"process start => pid : {pid}, tid : {tid}, modinfo_name : {modinfo_name},\ modinfo_base : {hex(modinfo_base)}, modinfo_size : {hex(modinfo_size)}") def dbg_process_exit(self, pid, tid, ea, exit_code): for e in self.entry_points: idc.del_bpt(e) for addr in self.break_points: idc.del_bpt(addr) if self.getproc_func_addr != None and ida_dbg.exist_bpt(self.getproc_func_addr) == True: idc.del_bpt(self.getproc_func_addr) logging.info(f"process exit => pid : {pid}, tid : {tid}, ea : {hex(ea)}, exit_code : {exit_code}") def dbg_thread_start(self, pid, tid, ea): logging.info(f"thread start => pid : {pid}, tid : {tid}, ea : {hex(ea)}") def dbg_thread_exit(self, pid, tid, ea, exit_code): logging.info(f"thread exit => pid : {pid}, tid : {tid}, ea : {hex(ea)}, exit_code : {exit_code}") def dbg_library_load(self, pid, tid, ea, modinfo_name, modinfo_base, modinfo_size): # writefile self.module_address_name[modinfo_base] = os.path.splitext(os.path.basename(modinfo_name))[0].lower() logging.info(f"library load => pid : {pid}, tid : {tid}, ea : {hex(ea)},\ modinfo_name : {modinfo_name}, modinfo_base : {hex(modinfo_base)},\ modinfo_size : {hex(modinfo_size)}") # module_funcs to_hook_func def dbg_library_unload(self, pid, tid, ea, info): logging.info(f"unload library => pid : {pid}, tid : {tid}, ea : {hex(ea)}, info : {info}") def is_hit_entry_point(self, bptea): for entry in self.entry_points: if entry == bptea: # hit entry point return True return False def get_ptr_from_addr(self, addr): """Get pointer value from given address.""" ida_dbg.refresh_debugger_memory() if self.is_64bit: target_func_addr = idc.get_qword(addr) else: target_func_addr = idc.get_wide_dword(addr) return target_func_addr # add breakpoint on GetProcAddress and target hook funcs def hook_funcs_action(self): def hook(module_name, hook_func): if module_name not in self.module_funcs: # return for idx, func in enumerate(self.module_funcs[module_name]): if hook_func["name"] == func["name"]: target_func_addr = self.get_ptr_from_addr(func["ea"]) module_func_s = "%s!%s" % (module_name, hook_func["name"]) self.module_funcs[module_name][idx]["va"] = target_func_addr self.break_points[target_func_addr] = module_func_s logging.debug("hook:%s!%s => %s" % (module_name, hook_func["name"], hex(target_func_addr))) idc.add_bpt(target_func_addr) for module_name in self.to_hook_func: hook_funcs = self.to_hook_func[module_name] for hook_func in hook_funcs: hook(module_name, hook_func) if "kernel32!GetProcAddress" not in self.break_points: #self.getproc_func_addr = self.get_ptr_from_addr(idc.get_name_ea_simple("GetProcAddress")) for func in self.module_funcs["kernel32"]: if func["name"] == "GetProcAddress": self.getproc_func_ea = func["ea"] break self.getproc_func_addr = self.get_ptr_from_addr(self.getproc_func_ea) # self.break_points[self.getproc_func_addr] = "kernel32!GetProcAddress" logging.debug(f"hook:kernel32!GetProcAddress => {hex(self.getproc_func_addr)}") idc.add_bpt(self.getproc_func_addr) def find_hook_func_breakpoint(self, bptea): return bptea in self.break_points.values() def get_func_args(self, nth): ida_dbg.refresh_debugger_memory() if self.is_64bit: four_args = ["RCX", "RDX", "R8", "R9"] if nth == 0: rsp = idc.get_reg_value("RSP") result = idc.get_qword(rsp) elif nth < 5: result = idc.get_reg_value(four_args[nth - 1]) else: rsp = idc.get_reg_value("RSP") result = idc.get_qword(rsp + (nth - 4) * 8) else: esp = idc.get_reg_value("ESP") result = idc.get_wide_dword(esp + nth * 4) return result def get_register_value(self, register_name): if self.is_64bit: register_name = "R" + register_name else: register_name = "E" + register_name return idc.get_reg_value(register_name) def get_args_number(self, bptea): module_name, func_name = self.break_points[bptea].split("!") if module_name in self.to_hook_func: for hook_func in self.to_hook_func[module_name]: if hook_func["name"] == func_name: return hook_func["argc"] return None def hit_getprocaddress_or_ret_bp(self, bptea): # hit breakpoint on GetProcAddress if bptea == self.getproc_func_addr: hmodule = self.get_func_args(1) if hmodule in self.module_address_name: module_name = self.module_address_name[hmodule] get_proc_addr_ret_addr = self.get_func_args(0) func_name_addr = self.get_func_args(2) func_name_str = idc.get_strlit_contents(func_name_addr) # print(f"func_name_str : {func_name_str.decode('utf-8')}") # add a new module to module_funcs if module_name not in self.module_funcs: self.module_funcs[module_name] = [] if func_name_str != None: self.module_funcs[module_name].append({"name":func_name_str.decode('utf-8')}) self.getproc_func_ret_addrs[get_proc_addr_ret_addr] = \ { "module_name":module_name, "func_name":func_name_str.decode("utf-8"), "debug_info":f"hook function exec: {hex(get_proc_addr_ret_addr)} => kernel32!" } idc.add_bpt(get_proc_addr_ret_addr) return True # hit breakpoint on GetProcAddress return address if bptea in self.getproc_func_ret_addrs: _module_name = self.getproc_func_ret_addrs[bptea]["module_name"] _func_name = self.getproc_func_ret_addrs[bptea]["func_name"] _debug_info = self.getproc_func_ret_addrs[bptea]["debug_info"] target_func_addr = self.get_register_value("AX") if _debug_info != None: _debug_info += f"GetProcAddress({_module_name},{_func_name}) = {hex(target_func_addr)}" logging.debug(_debug_info) for idx, func in enumerate(self.module_funcs[_module_name]): if func["name"] == _func_name: self.module_funcs[_module_name][idx]["va"] = target_func_addr break if _module_name in self.to_hook_func: for idx, hook_func in enumerate(self.to_hook_func[_module_name]): if hook_func["name"] == _func_name: self.break_points[target_func_addr] = "%s!%s" % (_module_name, _func_name) if ida_dbg.exist_bpt(target_func_addr) == False: logging.debug("hook:%s!%s => %s" % (_module_name, hook_func["name"], hex(target_func_addr))) idc.add_bpt(target_func_addr) break idc.del_bpt(bptea) self.getproc_func_ret_addrs.pop(bptea) return True return False def hit_general_hook_func(self, bptea): if bptea in self.break_points: _module_name, _func_name = self.break_points[bptea].split('!') general_hook_func_ret_addr = self.get_func_args(0) args_number = self.get_args_number(bptea) if args_number != None: _debug_info = f"hook function exec: {hex(general_hook_func_ret_addr)} => {self.break_points[bptea]}(" if _module_name in functions and _func_name in functions[_module_name].total_func: _debug_info += functions[_module_name].total_func[_func_name]() else: for i in range(1, args_number + 1): _debug_info += f"{hex(self.get_func_args(i))}," _debug_info = _debug_info[:-1] + ")" self.break_points_ret_addr[general_hook_func_ret_addr] = \ { "module_name":_module_name, "func_name":_func_name, "debug_info":_debug_info } idc.add_bpt(general_hook_func_ret_addr) return True if bptea in self.break_points_ret_addr: _module_name = self.break_points_ret_addr[bptea]["module_name"] _func_name = self.break_points_ret_addr[bptea]["func_name"] _debug_info = self.break_points_ret_addr[bptea]["debug_info"] if _debug_info != None: if _module_name in functions and _func_name in functions[_module_name].total_func: _debug_info += functions[_module_name].total_func[_func_name](True) else: _debug_info += f"={hex(idc.get_reg_value('AX'))}" logging.debug(_debug_info) idc.del_bpt(bptea) self.break_points_ret_addr.pop(bptea) return True return False def dbg_bpt(self, tid, bptea): if self.is_hit_entry_point(bptea) == True: self.dbg_get_imports() self.hook_funcs_action() ida_dbg.request_continue_process() ida_dbg.run_requests() return 0 hit_res = self.hit_getprocaddress_or_ret_bp(bptea) if hit_res == False: hit_res = self.hit_general_hook_func(bptea) if hit_res == False and self.ignore_bp == False: return 0 ida_dbg.request_continue_process() ida_dbg.run_requests() return 0 def dbg_exception(self, pid, tid, ea, exc_code, exc_can_cont, exc_ea, exc_info): logging.debug("Exception: pid=%d tid=%d ea=0x%x exc_code=0x%x can_continue=%d exc_ea=0x%x exc_info=%s" % ( pid, tid, ea, exc_code & idaapi.BADADDR, exc_can_cont, exc_ea, exc_info)) def dbg_get_imports(self): nimps = ida_nalt.get_import_module_qty() for i in range(nimps): module_name = ida_nalt.get_import_module_name(i).lower() if not module_name: logging.warning("Failed to get import module name for #%d" % i) module_name = "<unnamed>" self.module_funcs[module_name] = [] def imp_cb(ea, name, ordinal): if not name: self.module_funcs[module_name].append({"ea":ea, "ordinal":ordinal}) else: self.module_funcs[module_name].append({"ea":ea, "ordinal":ordinal, "name":name}) return True ida_nalt.enum_import_names(i, imp_cb) def add_hook_functions(self, module_name, func_name, argc): if module_name not in self.to_hook_func: self.to_hook_func[module_name] = [] self.to_hook_func[module_name].append({"name":func_name, "argc" : argc}) if __name__ == "__main__": fh = FuncTracePlus(ignore_bp=False) fh.hook() # fh.add_hook_functions("user32", "MessageBoxA", 4, CALL_CONV_STDCALL) fh.add_hook_functions("kernel32", "CreateFileA", 7) fh.add_hook_functions("kernel32", "CreateFileW", 7) fh.add_hook_functions("kernel32", "ReadFile", 5) fh.add_hook_functions("kernel32", "WriteFile", 5) fh.add_hook_functions("kernel32", "CloseHandle", 1) # fh.add_hook_functions("kernel32", "CreateThread", 6) # fh.add_hook_functions("kernel32", "CreateProcessA", 10) # fh.add_hook_functions("kernel32", "CreateProcessW", 10) # fh.add_hook_functions("kernel32", "VirtualAlloc", 4) # fh.add_hook_functions("kernel32", "VirtualFree", 3) # fh.add_hook_functions("kernel32", "VirtualProtect", 4) # fh.add_hook_functions("kernel32", "IsDebuggerPresent", 0)