docker_images/pythonv2/wrapper/python_glue/leak_check.py (151 lines of code) (raw):
# Copyright (c) Microsoft. All rights reserved.
# Licensed under the MIT license. See LICENSE file in the project root for
# full license information.
import gc
import inspect
import os
import weakref
import time
import logging
import importlib
logger = logging.getLogger(__name__)
def _run_garbage_collection():
"""
Collect everything until there's nothing more to collect
"""
sleep_time = 2
done = False
while not done:
collected = gc.collect(2)
logger.info("{} objects collected".format(collected))
if collected:
logger.info("Sleeping for {} seconds".format(sleep_time))
time.sleep(sleep_time)
else:
done = True
def _dump_referrers(obj):
referrers = gc.get_referrers(obj.weakref())
for referrer in referrers:
if isinstance(referrer, dict):
print(" dict: {}".format(referrer))
for sub_referrer in gc.get_referrers(referrer):
if sub_referrer != referrers:
if not inspect.ismodule(sub_referrer):
print(
" used by: {}:{}".format(
type(sub_referrer), sub_referrer
)
)
elif not isinstance(referrer, type) and not inspect.ismodule(referrer):
print(" used by: {}:{}".format(type(referrer), referrer))
class RefObject(object):
"""
Object holding details on the leak of some tracked object
"""
def __init__(self, obj):
self.value = str(obj)
self.weakref = weakref.ref(obj)
def __repr__(self):
return self.value
def __eq__(self, obj):
return self.weakref == obj.weakref
def __ne__(self, obj):
return not self == obj
class TrackedModule(object):
def __init__(self, module_name):
self.module_name = module_name
mod = importlib.import_module(module_name)
self.path = os.path.dirname(inspect.getsourcefile(mod))
def is_module_object(self, obj):
if not isinstance(obj, BaseException):
try:
c = obj.__class__
source_file = inspect.getsourcefile(c)
except (TypeError, AttributeError):
pass
else:
if source_file and source_file.startswith(self.path):
return True
return False
class LeakTracker(object):
def __init__(self):
self.tracked_modules = []
self.previous_leaks = []
def add_tracked_module(self, module_name):
self.tracked_modules.append(TrackedModule(module_name))
def _get_all_tracked_objects(self):
"""
Query the garbage collector for a a list of all objects that
are implemented in tracked libraries
"""
all = []
for obj in gc.get_objects():
if any([mod.is_module_object(obj) for mod in self.tracked_modules]):
source_file = inspect.getsourcefile(obj.__class__)
try:
all.append(RefObject(obj))
except TypeError:
logger.warning(
"Could not add {} from {} to leak list".format(
obj.__class__, source_file
)
)
return all
def _prune_previous_leaks_list(self):
"""
remove objects from our list of previous leaks if they've been collected
"""
new_previous_leaks = []
for obj in self.previous_leaks:
if obj.weakref():
new_previous_leaks.append(obj)
else:
logger.info(
"Object {} collected since last test. Removing from previous_leaks list.".format(
obj
)
)
logger.info(
"previous leaks pruned from {} items to {} items".format(
len(self.previous_leaks), len(new_previous_leaks)
)
)
self.previous_leaks = new_previous_leaks
def _filter_previous_leaks(self, all):
"""
Return a filtered leak list where all previously reported leaks have been removed.
"""
self._prune_previous_leaks_list()
new_list = []
for obj in all:
if obj not in self.previous_leaks:
new_list.append(obj)
else:
logger.info("Object {} previously reported".format(obj))
logger.info(
"active list pruned from {} items to {} items".format(
len(all), len(new_list)
)
)
return new_list
def set_baseline(self):
self.previous_leaks = self._get_all_tracked_objects()
def check_for_new_leaks(self):
"""
Get all tracked objects from the garbage collector. If any objects remain, list
them and assert so the test fails.
"""
_run_garbage_collection()
all_tracked_objects = self._get_all_tracked_objects()
all_tracked_objects = self._filter_previous_leaks(all_tracked_objects)
if len(all_tracked_objects):
logger.error(
"Test failure. {} objects have leaked:".format(
len(all_tracked_objects)
)
)
count = 0
for obj in all_tracked_objects:
count += 1
if count <= 100:
logger.error("LEAK: {}".format(obj))
_dump_referrers(obj)
self.previous_leaks.append(obj)
if count < len(all_tracked_objects):
logger.errer(
"and {} more objects".format(len(all_tracked_objects) - count)
)
referrers = self.get_referrers(all_tracked_objects) # noqa: F841
assert False
else:
logger.info("No leaks")
def get_referrers(self, objects):
"""
Get all referrers for all objects as a way to see why objects are leaking.
Meant to be run inside a debugger, probably using pprint on the output
"""
all_referrers = []
index = 0
for obj in objects:
referrers = []
for ref in gc.get_referrers(obj.weakref()):
if type(ref) in [dict] or str(type(ref)) in ["<class 'cell'>"]:
referrers.append(ref)
else:
referrers.append(RefObject(ref))
all_referrers.append({"index": index, "obj": obj, "referrers": referrers})
index += 1
return all_referrers