btf/btfhive/aarch64/getVmlinux.py (149 lines of code) (raw):
# -*- coding: utf-8 -*-
# cython:language_level=3
"""
-------------------------------------------------
File Name: getVmlinux
Description :
Author : liaozhaoyan
date: 2021/12/1
-------------------------------------------------
Change Activity:
2021/12/1:
-------------------------------------------------
"""
__author__ = 'liaozhaoyan'
import sys
import os
import shlex
from subprocess import PIPE, Popen
from getFuncs import CgetVminfo, CgenfuncsDb
HivePath = "/home/vmhive/aarch64/"
VMPath = HivePath + "vmlinux/"
BTFPath = HivePath + "btf/"
HeadPath = HivePath + "header/"
FuncPath = HivePath + "funcs/"
DBPath = HivePath + "db/"
PkgPath = HivePath + "pkg/"
RVMPath = PkgPath
class CexecCmd(object):
def __init__(self):
super(CexecCmd, self).__init__()
@staticmethod
def cmd(cmds):
p = Popen(shlex.split(cmds), stdout=PIPE)
return p.stdout.read().decode().strip()
@staticmethod
def system(cmds):
cmds = cmds.replace('\0', '').strip()
return os.system(cmds)
class CgetVmlinux(CexecCmd):
def __init__(self):
super(CgetVmlinux, self).__init__()
def drpm(self, path):
self.system("rpm2cpio %s|cpio -id" % path)
def ddeb(self, path):
self.cmd("ar x %s" % path)
if os.path.exists("data.tar.xz"):
self.cmd("xz -d data.tar.xz")
self.cmd("tar xf data.tar")
else:
self.cmd("tar -I zstd -xvf data.tar.zst")
def _genRpmVer(self, name):
_, _, n = name.split("-", 2)
ver, _ = n.rsplit(".", 1)
return ver
#linux-image-3.13.0-96-generic-dbgsym_3.13.0-96.143_i386.ddeb -> 3.13.0-96-generic
def _genDebVer(self, name):
if name.startswith("linux-image-unsigned"):
_, _, _, n = name.split("-", 3)
else:
_, _, n = name.split("-", 2)
ver, _ = n.split("-dbgsym", 1)
return ver
def copyVmlinuxRpm(self, name, release):
res = self.cmd("find ./ -name vmlinux").strip("\n")
ver = self._genRpmVer(name)
dPath = VMPath + "%s/vmlinux-%s" % (release, ver)
cmd = "cp %s %s" % (res, dPath)
self.cmd(cmd)
return [ver, release, dPath]
def copyVmlinuxDeb(self, name, release):
res = self.cmd("find ./ -name vmlinux*").strip("\n").split('\n')
for r in res:
if os.path.isfile(r) and not r.endswith("decompressor"):
ver = self._genDebVer(name)
dPath = VMPath + release +"/vmlinux-%s" % ver
cmd = "cp %s %s" % (r, dPath)
self.cmd(cmd)
break
return [ver, release, dPath]
def checkProc(self, name, release):
if name.endswith(".rpm"):
ver = self._genRpmVer(name)
elif name.endswith(".ddeb"):
ver = self._genDebVer(name)
else:
return True
funsPath = FuncPath + "%s/funs-%s.txt" % (release, ver)
btfPath = BTFPath + "%s/vmlinux-%s" % (release, ver)
dbPath = DBPath + "%s/info-%s.db" % (release, ver)
vmPath = VMPath + "%s/vmlinux-%s" % (release, ver)
if os.path.exists(dbPath):
return True
if os.path.exists(funsPath) and os.path.exists(btfPath):
self._genDb(ver, release)
return True
if os.path.exists(vmPath):
self.genOthers(ver, release, vmPath)
return True
return False
def _check_mount(self):
if not os.path.exists(f"{PkgPath}flag"):
pass
# self.cmd("sshfs root@47.113.194.53:/root/1ext/down/pkg /home/vmhive/pkg/")
def _proc_work(self, url, name, release):
pkg = f"{PkgPath}{name}"
# self._check_mount()
# if os.path.exists(pkg) and not os.path.exists(f"{pkg}.st"):
# self.cmd(f"cp {pkg} ./")
# else:
self.cmd("axel -n 4 %s" % url)
res = None
if not os.path.exists(name):
raise Exception("failed to get file.")
if name.endswith(".rpm"):
self.drpm(name)
res = self.copyVmlinuxRpm(name, release)
elif name.endswith(".ddeb"):
self.ddeb(name)
res = self.copyVmlinuxDeb(name, release)
# self._check_mount()
# if os.path.exists(pkg) and not os.path.exists(f"{pkg}.st"):
# self.cmd(f"rm -f {pkg}")
return res
def _check_remote_vm(self, name, release):
return None
# self._check_mount()
# if name.endswith(".rpm"):
# ver = self._genRpmVer(name)
# elif name.endswith(".ddeb"):
# ver = self._genDebVer(name)
# else:
# return None
# sPath = RVMPath + "vmlinux-%s" % ver
# if not os.path.exists(sPath):
# return None
# dPath = VMPath + "%s/vmlinux-%s" % (release, ver)
# cmd = "cp %s %s" % (sPath, dPath)
# self.cmd(cmd)
# return [ver, release, dPath]
def proc(self, url, name, release):
if self.checkProc(name, release):
return
res = self._check_remote_vm(name, release)
if res is None:
lastWork = os.getcwd()
self.cmd("rm -rf work")
os.mkdir("work")
os.chdir("work")
res = self._proc_work(url, name, release)
os.chdir(lastWork)
self.cmd("rm -rf work")
if res is not None:
self.genOthers(*res)
def _genBtfHead(self, ver, release, vmPath):
btfPath = BTFPath + "%s/vmlinux-%s" % (release, ver)
headPath = HeadPath + "%s/vmlinux-%s.h" % (release, ver)
self.cmd("cp %s %s" % (vmPath, btfPath))
self.cmd("pahole -J %s" % btfPath)
self.cmd("llvm-objcopy --only-section=.BTF --set-section-flags .BTF=alloc,readonly --strip-all %s" % btfPath)
self.cmd("aarch64-linux-gnu-strip -x %s" % btfPath)
if os.path.exists(f"{btfPath}.btf"):
self.cmd(f"rm -f {btfPath}")
self.cmd(f"mv {btfPath}.btf {btfPath}")
self.system("bpftool btf dump file %s format c > %s" % (btfPath, headPath))
def _getFuns(self, ver, release, vmPath):
funsPath = FuncPath + "%s/funs-%s.txt" % (release, ver)
g = CgetVminfo(vmPath)
g.genFile(funsPath)
def _genDb(self, ver, release):
funsPath = FuncPath + "%s/funs-%s.txt" % (release, ver)
btfPath = BTFPath + "%s/vmlinux-%s" % (release, ver)
dbPath = DBPath + "%s/info-%s.db" % (release, ver)
vmPath = VMPath + "%s/vmlinux-%s" % (release, ver)
print(f"gen {dbPath}")
db = CgenfuncsDb(dbPath)
self.system(f"pahole {btfPath} > struct.txt")
db.funcs(funsPath)
db.structs("struct.txt")
db.types(vmPath)
def genOthers(self, ver, release, vmPath):
self._genBtfHead(ver, release, vmPath)
self._getFuns(ver, release, vmPath)
self._genDb(ver, release)
if __name__ == "__main__":
pass