lcc/pylcc/lib/lbcBase.py (400 lines of code) (raw):
# -*- coding: utf-8 -*-
# cython:language_level=3
"""
-------------------------------------------------
File Name: lbcBase
Description :
Author : liaozhaoyan
date: 2021/7/20
-------------------------------------------------
Change Activity:
2021/7/20:
-------------------------------------------------
"""
__author__ = 'liaozhaoyan'
import sys
import os
import json
import hashlib
import signal
import cffi
from threading import Thread
from multiprocessing import cpu_count
from pylcc.lbcMaps import mapsDict
from surftrace.execCmd import CexecCmd
from surftrace.surfException import InvalidArgsException, RootRequiredException, FileNotExistException, DbException
from surftrace.lbcClient import ClbcClient, segDecode
from surftrace.surfCommon import taskList
from surftrace.uprobeParser import CuprobeParser
from pylcc.lbcInclude import ClbcInclude
from pylcc.perfEvent import *
LBC_COMPILE_PORT = 7655
def getCwd(pathStr):
return os.path.split(os.path.realpath(pathStr))[0]
class ClbcLoad(object):
def __init__(self, bpf, bpf_str="",
server="pylcc.openanolis.cn",
arch="", ver="", env="",
workPath=None, incPath=None,
logLevel=-1, btf=True,
opt="so",
):
if "LBC_SERVER" in os.environ:
server = os.environ["LBC_SERVER"]
if "LBC_LOGLEVEL" in os.environ:
logLevel = int(os.environ["LBC_LOGLEVEL"])
if workPath:
self._wPath = workPath
else:
self._wPath = os.getcwd()
self._incPath = incPath
super(ClbcLoad, self).__init__()
self._ffi = cffi.FFI()
self._so = None
self._need_deinit = False
self._server = server
self._c = CexecCmd()
if btf:
self._checkRoot()
self._env = env
self._logLevel = logLevel
if ver == "":
ver = self._c.cmd('uname -r')
if arch == "":
arch = self._c.cmd('uname -m')
if btf:
self._checkBtf(ver, arch)
if bpf.endswith(".bpf.c"):
bpf = bpf[:-6]
if opt == "so":
self._getSo(bpf, bpf_str, ver, arch)
elif opt == "obj":
self._compileObj(bpf, bpf_str, ver, arch)
elif opt == "combine":
pass
def __del__(self):
if self._so:
self._closeSo()
def _deinitSo(self):
self._checkSo()
self._so.lbc_bpf_exit()
self._need_deinit = False
def _closeSo(self):
if self._need_deinit:
self._deinitSo()
self._ffi.dlclose(self._so)
def _checkBtf(self, ver, arch):
if os.path.exists('/sys/kernel/btf/vmlinux'):
return
name = "/boot/vmlinux-%s" % ver
if not os.path.exists(name):
cli = ClbcClient(server=self._server, ver=ver, arch=arch)
dRecv = cli.getBtf()
if dRecv['btf'] is None:
print("get btf failed, log is:\n%s" % dRecv['log'])
raise InvalidArgsException("get btf failed.")
print("get btf from remote success.")
with open(name, 'wb') as f:
f.write(segDecode(dRecv['btf']))
def _setupSoName(self, bpf):
return self._wPath + '/' + bpf + ".so"
def _setUpCode(self, bpf, s):
if s == "":
bpf_c = self._wPath + '/' + bpf + ".bpf.c"
if os.path.exists(bpf_c):
with open(bpf_c, "r") as f:
s = f.read()
def _getSo(self, bpf, s, ver, arch):
bpf_so = self._setupSoName(bpf)
if s == "":
bpf_c = self._wPath + '/' + bpf + ".bpf.c"
if os.path.exists(bpf_c):
with open(bpf_c, "r") as f:
s = f.read()
need = self._checkStrCompile(s, bpf_so, ver, arch)
if need:
self._compileSo(s, bpf_so, ver, arch)
def _checkCCompile(self, bpf_c, bpf_so, ver, arch):
cFlag = os.path.exists(bpf_c)
oFlag = os.path.exists(bpf_so)
if not (cFlag or oFlag): # is not exist
raise FileNotExistException("bpf.c or so is not in this dictionary.")
elif not oFlag and cFlag: # only bpf.c
return True
elif oFlag and not cFlag: # only so, should check version
if self._checkVer(bpf_so, ver, arch):
raise FileNotExistException("bad bpf.so and not bpf.c")
return False
else: # both bpf.c and bo, check hash and version
with open(bpf_c, "r") as f:
s = f.read()
s += self._env
if sys.version_info.major >= 3:
cHash = hashlib.sha256(s.encode()).hexdigest()
else:
cHash = hashlib.sha256(s).hexdigest()
if self._checkHash(bpf_so, cHash):
return True
return self._checkVer(bpf_so, ver, arch)
def _checkStrCompile(self, s, bpf_so, ver, arch):
oFlag = os.path.exists(bpf_so)
if not oFlag: # only string
return True
elif s == "": # only so, no string.
return False
else: # both bpf.c and bo, check hash and version
s = self._combineSource(s)
s += self._env
if sys.version_info.major >= 3:
cHash = hashlib.sha256(s.encode()).hexdigest()
else:
cHash = hashlib.sha256(s).hexdigest()
if self._checkHash(bpf_so, cHash):
return True
return self._checkVer(bpf_so, ver, arch)
def _parseVer(self, ver):
major, minor, _ = ver.split(".", 2)
return major
def _checkVer(self, bpf_so, ver, arch):
"""if should compile return ture, else return false"""
try:
self._so = self._ffi.dlopen(bpf_so)
except (OSError, FileNotFoundError):
return True
soVer = self._loadDesc()['kern_version']
self._closeSo()
soMajor = self._parseVer(soVer)
hMajor = self._parseVer(ver)
return (int(soMajor) > 3) ^ (int(hMajor) > 3)
def _checkHash(self, bpf_so, cHash):
"""if should compile return ture, else return false"""
try:
self._so = self._ffi.dlopen(bpf_so)
except (OSError, FileNotFoundError):
return True
self._ffi.cdef(self._cdef(), override=True)
soHash = self._loadDesc()['hash']
self._closeSo()
return not cHash == soHash
def _checkRoot(self):
cmd = 'whoami'
line = self._c.cmd(cmd).strip()
if line != "root":
raise RootRequiredException('this app need run as root')
def _combineSource(self, s):
inc = ClbcInclude(self._wPath, self._incPath)
return inc.parse(s)
def _compileSo(self, s, bpf_so, ver, arch):
cli = ClbcClient(server=self._server, ver=ver, arch=arch, port=LBC_COMPILE_PORT)
dRecv = cli.getC(s, self._env)
if dRecv is None:
raise Exception("receive error")
if dRecv['so'] is None:
print("compile failed, log is:\n%s" % dRecv['clog'])
raise InvalidArgsException("compile failed.")
print("remote server compile success.")
with open(bpf_so, 'wb') as f:
f.write(segDecode(dRecv['so']))
def _compileObj(self, bpf, bpf_str, ver, arch):
if bpf_str == "":
cName = bpf + ".bpf.c"
if not os.path.exists(cName):
raise InvalidArgsException("file %s is not exist." % cName)
with open(cName, 'r') as f:
bpf_str = f.read()
objName = bpf + ".bpf.o"
cli = ClbcClient(server=self._server, ver=ver, arch=arch, port=LBC_COMPILE_PORT)
s = self._combineSource(bpf_str)
dRecv = cli.getObj(s, self._env)
if dRecv is None:
raise Exception("receive error")
if dRecv['obj'] is None:
print("compile failed, log is:\n%s" % dRecv['clog'])
raise InvalidArgsException("compile failed.")
print("remote server compile success.")
with open(objName, 'wb') as f:
f.write(segDecode(dRecv['obj']))
def _loadSo(self, bpf_so):
self._so = self._ffi.dlopen(bpf_so)
self._ffi.cdef(self._cdef(), override=True)
@staticmethod
def _cdef():
return """
int lbc_bpf_init(int log_level, int attach);
void lbc_bpf_exit(void);
int lbc_bpf_get_maps_id(char* event);
int lbc_set_event_cb(int id,
void (*cb)(void *ctx, int cpu, void *data, unsigned int size),
void (*lost)(void *ctx, int cpu, unsigned long long cnt));
int lbc_event_loop(int id, int timeout);
int lbc_map_lookup_elem(int id, const void *key, void *value);
int lbc_map_lookup_elem_flags(int id, void *key, void *value, unsigned long int);
int lbc_map_lookup_and_delete_elem(int id, void *key, void *value);
int lbc_map_delete_elem(int id, void *key);
int lbc_map_update_elem(int id, void *key, void *value);
int lbc_map_get_next_key(int id, void *key, void *next_key);
int lbc_attach_perf_event(char* func, int pfd);
int lbc_attach_kprobe(char* func, char* sym);
int lbc_attach_kretprobe(char* func, char* sym);
int lbc_attach_uprobe(char* func, int pid, char *binary_path, unsigned long func_offset);
int lbc_attach_uretprobe(char* func, int pid, char *binary_path, unsigned long func_offset);
int lbc_attach_tracepoint(char* func, char *tp_category, char *tp_name);
int lbc_attach_raw_tracepoint(char* func, char *tp_name);
int lbc_attach_cgroup(char* func, int cgroup_fd);
int lbc_attach_netns(char* func, int netns);
int lbc_attach_xdp(char* func, int ifindex);
char* lbc_get_map_types(void);
"""
def _checkSo(self):
if not self._so:
raise InvalidArgsException("so not setup.")
def _loadDesc(self):
self._checkSo()
desc = self.c2str(self._so.lbc_get_map_types())
return json.loads(desc)
def _initSo(self, attach=1):
self._checkSo()
r = self._so.lbc_bpf_init(self._logLevel, attach)
if r != 0:
raise InvalidArgsException("so init failed")
self._need_deinit = True
def c2str(self, data):
return self._ffi.string(data)
@staticmethod
def c2list(data):
arr = []
for i in range(len(data)):
arr.append(data[i])
return arr
class ClbcBase(ClbcLoad):
def __init__(self, bpf, bpf_str="",
server="pylcc.openanolis.cn",
arch="", ver="", env="",
attach=1, workPath=None
):
super(ClbcBase, self).__init__(bpf, bpf_str, server, arch, ver,
env, workPath=workPath)
bpf_so = self._setupSoName(bpf)
self._loadSo(bpf_so)
self._initSo(attach)
self.maps = {}
self._loadMaps()
self._cbInterrupt = None
def so(self):
return self._so
def _loadMaps(self):
d = self._loadDesc()
self._ffi.cdef("\n".join([d['ffi'], self._cdef()]), override=True)
tDict = mapsDict
dMaps = d['maps']
for k in dMaps.keys():
t = dMaps[k]['type']
if t in tDict:
self.maps[k] = tDict[t](self._so, k, dMaps[k], self._ffi)
else:
raise InvalidArgsException("bad type: %s, key: %s" % (t, k))
def getMap(self, name, data, size):
try:
return self.maps[name].event(data)
except IndexError:
return None
# https://man7.org/linux/man-pages/man2/perf_event_open.2.html
def attachPerfEvent(self, function, attrD, pid=0, cpu=-1, group_fd=-1, flags=0):
for k, v in attrD.items(): # json int type not support 64 bit
if type(v) is not str:
try:
attrD[k] = "%d" % v
except TypeError:
print("key %s type is %s, not support, skip." % (k, type(v)))
del attrD[k]
attrs = json.dumps(attrD)
res = self._so.lbc_attach_perf_event(function.encode(), attrs.encode(), pid, cpu, group_fd, flags)
if res != 0:
raise InvalidArgsException("attach %s to perf event failed." % function)
return res
def attachAllCpuPerf(self, function, attrD, pid=-1, group_fd=-1, flags=0):
nr_cpu = cpu_count()
for i in range(nr_cpu):
self.attachPerfEvent(function, attrD, pid=pid, cpu=i, group_fd=group_fd, flags=flags)
def attachPerfEvents(self, function, attrD, pid, group_fd=-1, flags=0):
p = taskList(pid)
for pthread in p.threads():
self.attachPerfEvent(function, attrD, pid=pthread.id, cpu=-1, group_fd=group_fd, flags=flags)
def attachJavaSym(self, function, pid, symbol):
pFile = "/tmp/perf-%d.map" % pid
if not os.path.exists(pFile):
raise InvalidArgsException("can not find java maps for pid %d." % pid)
syms = []
with open(pFile, 'r') as f:
for line in f.readlines():
start, size, sym = line.split(' ', 2)
d = {"start": int(start, 16),
"size": int(size, 16),
'sym': sym.strip(),
}
syms.append(d)
res = None
for symd in syms:
if symd['sym'] == symbol:
res = symd
break
if res is None:
raise InvalidArgsException("symbol %s is not in map." % symbol)
addr = res["start"] + 0x20
pfConfig = {
"type": PerfType.BREAKPOINT,
"size": PERF_ATTR_SIZE_VER5,
"sample_period": 1,
"precise_ip": 2,
"wakeup_events": 1,
"bp_type": PerfBreakPointType.X,
"bp_addr": addr,
"bp_len": 8,
}
self.attachPerfEvents(function, pfConfig, pid)
def attachKprobe(self, function, symbol):
res = self._so.lbc_attach_kprobe(function, symbol)
if res != 0:
raise InvalidArgsException("attach %s to kprobe %s failed." % (function, symbol))
def attachKretprobe(self, function, symbol):
res = self._so.lbc_attach_kretprobe(function, symbol)
if res != 0:
raise InvalidArgsException("attach %s to kretprobe %s failed." % (function, symbol))
def attachUprobe(self, function, pid, binaryPath, offset=0):
res = self._so.lbc_attach_uprobe(function, pid, binaryPath, offset)
if res != 0:
raise InvalidArgsException("attach %s to uprobe %s failed." % (function, binaryPath))
def attachUprobes(self, function, pid, binaryPath, offset=0):
if pid > 0:
p = taskList(pid)
for pthread in p.threads():
self.attachUprobe(function, pthread.id, binaryPath, offset)
else:
self.attachUprobe(function, pid, binaryPath, offset)
def attachUretprobe(self, function, pid, binaryPath, offset=0):
res = self._so.lbc_attach_uretprobe(function, pid, binaryPath, offset)
if res != 0:
raise InvalidArgsException("attach %s to uretprobe %s failed." % (function, binaryPath))
def attachUretprobes(self, function, pid, binaryPath, offset=0):
if pid > 0:
p = taskList(pid)
for pthread in p.threads():
self.attachUretprobe(function, pthread.id, binaryPath, offset)
else:
self.attachUretprobe(function, pid, binaryPath, offset)
def traceUprobes(self, function, pid, fxpr):
binaryPath, func = fxpr.split(":", 1)
parser = CuprobeParser(binaryPath)
fullPath = parser.fullObj()
offset = int(parser.funAddr(func), 16)
print(fullPath, offset)
self.attachUprobes(function, pid, fullPath, offset)
def traceUretprobes(self, function, pid, fxpr):
binaryPath, func = fxpr.split(":", 1)
parser = CuprobeParser(binaryPath)
fullPath = parser.fullObj()
offset = parser.funAddr(func)
self.attachUretprobes(function, pid, fullPath, offset)
def attachTracepoint(self, function, category, name):
res = self._so.lbc_attach_tracepoint(function, category, name)
if res != 0:
raise InvalidArgsException("attach %s to trace point %s failed." % (function, name))
def attachRawTracepoint(self, function, name):
res = self._so.lbc_attach_raw_tracepoint(function, name)
if res != 0:
raise InvalidArgsException("attach %s to raw trace point %s failed." % (function, name))
def attachCgroup(self, function, fd):
res = self._so.lbc_attach_cgroup(function, fd)
if res != 0:
raise InvalidArgsException("attach %s to cgroup %d failed." % (function, fd))
def attachNetns(self, function, fd):
res = self._so.lbc_attach_netns(function, fd)
if res != 0:
raise InvalidArgsException("attach %s to netns %d failed." % (function, fd))
def attachXdp(self, function, ifindex):
res = self._so.lbc_attach_xdp(function, ifindex)
if res != 0:
raise InvalidArgsException("attach %s to xdp %d failed." % (function, ifindex))
def _signalInterrupt(self, signum, frame):
self._cbInterrupt()
def waitInterrupt(self, cb=None):
if cb:
self._cbInterrupt = cb
signal.signal(signal.SIGINT, self._signalInterrupt)
signal.pause()
class CeventThread(Thread):
def __init__(self, lbc, event, cb, lost=None):
super(CeventThread, self).__init__()
self.setDaemon(True)
self._lbc = lbc
self._event = event
self._cb = cb
self.lost = lost
self.start()
def cb(self, cpu, data, size):
e = self._lbc.getMap(self._event, data, size)
self._cb(cpu, e)
def run(self):
self._lbc.maps[self._event].open_perf_buffer(self.cb, lost=self.lost)
self._lbc.maps[self._event].perf_buffer_poll()
if __name__ == "__main__":
pass