compile/remote-compile/lbc/tool/objelf.py (228 lines of code) (raw):
# -*- coding: utf-8 -*-
"""
-------------------------------------------------
File Name: objelf
Description :
Author : liaozhaoyan
date: 2022/11/5
-------------------------------------------------
Change Activity:
2022/11/5:
-------------------------------------------------
"""
__author__ = 'liaozhaoyan'
import datetime
import os
import sys
import shlex
import time
import json
import gc
import re
from subprocess import PIPE, Popen
from threading import Thread, Lock
from atobj import CatObj
from dict2db import Cobj2db
class Cobj2json(object):
reCellHead = re.compile(r"\<\d+\>\<[0-9a-f]+\>\:")
reInParenthesis = re.compile(r"(?<=\()[^\(\)]+(?=\))")
def __init__(self):
super(Cobj2json, self).__init__()
self._atObj = CatObj()
self._getStr = None
def _compileUnitHead(self, lines):
while True:
for i, line in enumerate(lines[:-1]):
if line.startswith("Contents of the") and not line.startswith("Contents of the .debug_info section:"):
return 0, None
if Cobj2json.reCellHead.search(line):
return 1, lines[i:]
tail = lines[-1]
s = tail + self._getStr()
lines = s.split("\n")
def _compileUnitEnd(self, lines):
res = []
while True:
for i, line in enumerate(lines[:-1]):
line = line.lstrip()
if len(line):
if line[0] == "<":
res.append(line)
else:
return res, lines[i:]
tail = lines[-1]
s = tail + self._getStr()
lines = s.split("\n")
def _findHead(self):
tail = ""
while True:
s = tail + self._getStr()
lines = s.split("\n")
for i, line in enumerate(lines[:-1]):
if line.startswith("Contents of the .debug_info section:"):
return lines[i:]
tail = lines[-1]
def _parseLine(self, line):
if Cobj2json.reCellHead.search(line):
head, content = line.split(":", 1)
levels, offsets = head.split("><")
level = int(levels[1:], 10)
offset = int(offsets[:-1], 16)
tag = "tag_name"
fit = Cobj2json.reInParenthesis.findall(content)
if len(fit):
value = fit[0]
else:
value = None
else:
level = -1
head, content = line.split(">", 1)
offset = int(head[1:], 16)
tags, values = content.split(":", 1)
tag = tags.strip()
if tag in self._atObj.atDict:
values = values.lstrip()
value = self._atObj.parse(tag, values)
elif tag == "Unknown AT value":
tag = None
value = ""
else:
print("\t tag: %s not in at dict. value: %s", tag, values)
value = values.strip()
return level, offset, tag, value
def _decode(self, lines):
lastLevel = 0
dStack = []
dRet = {}
for i, line in enumerate(lines):
level, offset, tag, value = self._parseLine(line)
if tag is None: # for Unknown AT value
continue
if level >= 0:
if value is None:
continue
dRet = {tag: value, "offset": offset}
if level == lastLevel:
if level == 0:
dStack.append(dRet)
else:
dParent = dStack[level - 1]
dRet = {tag: value, "offset": offset}
if "child" not in dParent:
raise ValueError("should add child.")
else:
dParent['child'].append(dRet)
dStack[level] = dRet
elif level > lastLevel: # > need add child.
if level > lastLevel + 1:
print("pid %d" % os.getpid())
raise ValueError("level %d, lastLevel %d" % (level, lastLevel))
dParent = dStack[level - 1]
if "child" in dParent:
raise ValueError("should no child.")
dParent['child'] = [dRet]
dStack.append(dRet)
lastLevel = level
elif level < lastLevel: # back to last level
dParent = dStack[level - 1]
dParent['child'].append(dRet)
dStack = dStack[:level+1]
dStack[level] = dRet
lastLevel = level
else: # single line.
dRet[tag] = value
return dStack[0]
def accept(self, getStr):
self._getStr = getStr
tail = self._findHead()
flag, tail = self._compileUnitHead(tail)
while flag:
lines, tail = self._compileUnitEnd(tail)
# with open("cell.txt", "w") as f:
# f.write("\n".join(lines))
yield self._decode(lines)
flag, tail = self._compileUnitHead(tail)
class Ctest(object):
def __init__(self, path="/root/1ext/code/surftrace/tools/dwarf/walks/samelf/dwarf.txt"):
super(Ctest, self).__init__()
self._handle = open(path, 'r')
self._db = Cobj2db("elf", "info.db")
def read(self):
res = self._handle.read(1024)
if res == "":
raise OSError("end of file")
return res
def test(self):
o = Cobj2json()
ana = o.accept(self.read)
for res in ana:
self._db.walks(res)
del res
gc.collect()
class CmonThread(Thread):
def __init__(self, mon):
super(CmonThread, self).__init__()
self.daemon = True
self._mon = mon
self.start()
def run(self):
while True:
time.sleep(0.1)
if self._mon.poll() is not None:
self._mon.closePipe()
break
class CobjElf(object):
def __init__(self, elf):
super(CobjElf, self).__init__()
if not os.path.exists(elf) or not os.path.isfile(elf):
raise ValueError("%s is not a file" % elf)
self._l = Lock()
self._r, self._w = -1, -1
self._elf = elf
self._p = None
def __del__(self):
if hasattr(self, "_p"):
self.closePipe()
def _exec(self):
cmd = "readelf -w %s" % self._elf
with self._l:
self._r, self._w = os.pipe()
self._p = Popen(shlex.split(cmd), stdout=self._w)
return self._p
def poll(self):
res = 0
with self._l:
if hasattr(self, "_p"):
res = self._p.poll()
return res
def closePipe(self):
with self._l:
if self._r >= 0:
os.close(self._r)
os.close(self._w)
self._r = -1
self._w = -1
try:
self._p.terminate()
except OSError:
pass
del self._p
def _read(self, size=16384):
if sys.version_info.major == 2:
res = os.read(self._r, size)
else:
res = os.read(self._r, size).decode()
if res is None or res == "":
raise OSError("end of file.")
return res
def out(self, toFile):
self._exec()
with open(toFile, 'w') as f:
while True:
try:
s = self._read()
except OSError:
break
f.write(s)
def toDb(self, name, db):
o = Cobj2json()
self._exec()
t = CmonThread(self)
ana = o.accept(self._read)
for res in ana:
obj = Cobj2db(name, db)
# with open("vm.json", "w") as f:
# json.dump(res, f)
obj.walks(res)
self.closePipe()
t.join(0.1)
if __name__ == "__main__":
e = CobjElf("/home/lbc/.output/lbc.bpf.o")
# e.out("vm.db")
t1 = datetime.datetime.now()
e.toDb("bpf", "bpf.db")
print("to Db.", datetime.datetime.now() - t1)
# t = Ctest()
# t = Ctest("./vm.txt")
# t.test()
pass