btf/btfhive/getVmlinux.py (156 lines of code) (raw):
# -*- coding: utf-8 -*-
# cython:language_level=3
"""
-------------------------------------------------
File Name: getVmlinux
Description :
Author : liaozhaoyan
date: 2021/12/1
-------------------------------------------------
Change Activity:
2021/12/1:
-------------------------------------------------
"""
__author__ = 'liaozhaoyan'
import sys
import os
import shlex
from subprocess import PIPE, Popen
from getFuncs import CgetVminfo, CgenfuncsDb
HivePath = "/root/vmhive/"
# VMPath = HivePath + "vmlinux/"
# BTFPath = HivePath + "btf/"
# HeadPath = HivePath + "header/"
# FuncPath = HivePath + "funcs/"
# DBPath = HivePath + "db/"
# PackPath = HivePath + "pkg/"
class CexecCmd(object):
def __init__(self):
super(CexecCmd, self).__init__()
@staticmethod
def cmd(cmds):
p = Popen(shlex.split(cmds), stdout=PIPE)
return p.stdout.read().decode().strip()
@staticmethod
def system(cmds):
cmds = cmds.replace('\0', '').strip()
return os.system(cmds)
class CgetVmlinux(CexecCmd):
def __init__(self):
super(CgetVmlinux, self).__init__()
def drpm(self, path):
self.system("rpm2cpio %s|cpio -id" % path)
def ddeb(self, path):
self.cmd("ar x %s" % path)
if os.path.exists("data.tar.xz"):
self.cmd("xz -d data.tar.xz")
self.cmd("tar xf data.tar")
else:
self.cmd("tar -I zstd -xvf data.tar.zst")
def _genRpmVer(self, name):
_, _, n = name.split("-", 2)
ver, _ = n.rsplit(".", 1)
return ver
#linux-image-3.13.0-96-generic-dbgsym_3.13.0-96.143_i386.ddeb -> 3.13.0-96-generic
def _genDebVer(self, name):
if name.startswith("linux-image-unsigned"):
_, _, _, n = name.split("-", 3)
else:
_, _, n = name.split("-", 2)
ver, _ = n.split("-dbgsym", 1)
return ver
def _copyVmlinuxRpm(self, name, release, arch):
res = self.cmd("find ./ -name vmlinux").strip("\n")
ver = self._genRpmVer(name)
VMPath = HivePath + arch + "/vmlinux/"
dPath = VMPath + "%s/vmlinux-%s" % (release, ver)
cmd = "cp %s %s" % (res, dPath)
self.cmd(cmd)
return [ver, release, dPath, arch]
def _copyVmlinuxDeb(self, name, release, arch):
res = self.cmd("find ./ -name vmlinux*").strip("\n").split('\n')
for r in res:
if os.path.isfile(r) and not r.endswith("decompressor"):
ver = self._genDebVer(name)
VMPath = HivePath + arch + "/vmlinux/"
dPath = VMPath + release +"/vmlinux-%s" % ver
cmd = "cp %s %s" % (r, dPath)
self.cmd(cmd)
break
return [ver, release, dPath, arch]
def _checkProc(self, name, release, arch):
if name.endswith(".rpm"):
ver = self._genRpmVer(name)
elif name.endswith(".ddeb"):
ver = self._genDebVer(name)
else:
return True
DBPath = HivePath + arch + "/db/"
dbPath = DBPath + "%s/info-%s.db" % (release, ver)
dbJournal = dbPath + "-journal"
PackPath = HivePath + arch + "/pack/"
packName = f"{PackPath}/{release}/{name}"
if os.path.exists(dbPath):
if not os.path.exists(dbJournal):
return True
if os.path.exists(packName):
workPath = self._prevProc()
self.cmd(f"cp {packName} ./")
res = self.__proc_work(name, release, arch)
self._afterProc(workPath, res)
return True
return False
def __proc_work(self, name, release, arch):
res = None
if not os.path.exists(name):
raise Exception("failed to get file.")
if name.endswith(".rpm"):
self.drpm(name)
res = self._copyVmlinuxRpm(name, release, arch)
elif name.endswith(".ddeb"):
self.ddeb(name)
res = self._copyVmlinuxDeb(name, release, arch)
return res
def _proc_work(self, url, name, release, arch):
self.cmd("axel -n 4 %s" % url)
PackPath = HivePath + arch + "/pack/"
self.cmd(f"cp {name} {PackPath}/{release}/")
return self.__proc_work(name, release, arch)
def _proc_kos(self, db):
db.parse_kos('./')
def _prevProc(self):
lastWork = os.path.abspath(os.getcwd())
self.cmd(f"rm -rf {os.getpid()}")
os.mkdir(f"{os.getpid()}")
os.chdir(f"{os.getpid()}")
return lastWork
def _afterProc(self, lastWork, res):
db = None
if res is not None:
db = self.genOthers(*res)
if db is not None:
vmPath = res[2]
try:
db.pasrseVmLinux(vmPath)
self._proc_kos(db)
except OSError as e:
print(f"parse {vmPath} report {e}")
os.chdir(lastWork)
self.cmd(f"rm -rf {os.getpid()}")
def proc(self, url, name, release, arch):
print(f"proc {name}, {arch}")
if self._checkProc(name, release, arch):
return
lastWork = self._prevProc()
res = self._proc_work(url, name, release, arch)
self._afterProc(lastWork, res)
def _genBtfHead(self, ver, release, vmPath, arch):
BTFPath = HivePath + arch + "/btf/"
HeadPath = HivePath + arch + "/head/"
btfPath = BTFPath + "%s/vmlinux-%s" % (release, ver)
headPath = HeadPath + "%s/vmlinux-%s.h" % (release, ver)
self.cmd("cp %s %s" % (vmPath, btfPath))
self.cmd("pahole -J %s" % btfPath)
self.cmd("llvm-objcopy --only-section=.BTF --set-section-flags .BTF=alloc,readonly --strip-all %s" % btfPath)
if arch == 'aarch64':
self.cmd("aarch64-linux-gnu-strip -x %s" % btfPath)
else:
self.cmd("strip -x %s" % btfPath)
if os.path.exists(f"{btfPath}.btf"):
self.cmd(f"rm -f {btfPath}")
self.cmd(f"mv {btfPath}.btf {btfPath}")
self.system("bpftool btf dump file %s format c > %s" % (btfPath, headPath))
def _getFuns(self, ver, release, vmPath, arch):
pass
def _genDb(self, ver, release, arch):
DBPath = HivePath + arch + "/db/"
dbPath = DBPath + "%s/info-%s.db" % (release, ver)
print(f"gen {dbPath}")
db = CgenfuncsDb(dbPath)
return db
def genOthers(self, ver, release, vmPath, arch):
self._genBtfHead(ver, release, vmPath, arch)
self._getFuns(ver, release, vmPath, arch)
return self._genDb(ver, release, arch)
if __name__ == "__main__":
pass