in metaflow/extension_support/__init__.py [0:0]
def find_spec(self, fullname, path, target=None):
# If we are trying to load a shadowed module (ending in ._orig), we don't
# say we handle it
# _ext_debug(
# "Looking for %s in %s with target %s" % (fullname, str(path), target)
# )
if any([fullname.startswith(e) for e in self._temp_excluded_prefix]):
return None
# If this is something we directly handle, return our loader
if fullname in self._handled:
return importlib.util.spec_from_loader(
fullname, _AliasLoader(fullname, self._handled[fullname])
)
# For the first pass when we try to load a shadowed module, we send it back
# without the ._orig and that will find the original spec of the module
# Note that we handle mymodule._orig.orig_submodule as well as mymodule._orig.
# Basically, the original module and any of the original submodules are
# available under _orig.
name_parts = fullname.split(".")
try:
orig_idx = name_parts.index("_orig")
except ValueError:
orig_idx = -1
if orig_idx > -1 and ".".join(name_parts[:orig_idx]) in self._handled:
orig_name = ".".join(name_parts[:orig_idx] + name_parts[orig_idx + 1 :])
parent_name = None
if orig_idx != len(name_parts) - 1:
# We have a parent module under the _orig portion so for example, if
# we load mymodule._orig.orig_submodule, our parent is mymodule._orig.
# However, since mymodule is currently shadowed, we need to reset
# the parent module properly. We know it is already loaded (since modules
# are loaded hierarchically)
parent_name = ".".join(
name_parts[:orig_idx] + name_parts[orig_idx + 1 : -1]
)
_ext_debug("Looking for original module '%s'" % orig_name)
prefix = ".".join(name_parts[:orig_idx])
self._temp_excluded_prefix.add(prefix)
# We also have to remove the module temporarily while we look for the
# new spec since otherwise it returns the spec of that loaded module.
# module is also restored *after* we call `create_module` in the loader
# otherwise it just returns None. We also swap out the parent module so that
# the search can start from there.
loaded_module = sys.modules.get(orig_name)
if loaded_module:
del sys.modules[orig_name]
parent_module = sys.modules.get(parent_name) if parent_name else None
if parent_module:
sys.modules[parent_name] = sys.modules[".".join([parent_name, "_orig"])]
# This finds the spec that would have existed had we not added all our
# _LazyFinders
spec = importlib.util.find_spec(orig_name)
self._temp_excluded_prefix.remove(prefix)
if not spec:
return None
if spec.submodule_search_locations:
self._orig_search_paths.update(spec.submodule_search_locations)
_ext_debug("Found original spec %s" % spec)
# Change the spec
spec.loader = _OrigLoader(
fullname,
spec.loader,
loaded_module,
parent_module,
)
return spec
for p in path or []:
if p in self._orig_search_paths:
# We need to look in some of the "_orig" modules
orig_override_name = ".".join(
name_parts[:-1] + ["_orig", name_parts[-1]]
)
_ext_debug(
"Looking for %s as an original module: searching for %s"
% (fullname, orig_override_name)
)
return importlib.util.find_spec(orig_override_name)
if len(name_parts) > 1:
# This checks for submodules of things we handle. We check for the most
# specific submodule match and use that
chop_idx = 1
while chop_idx < len(name_parts):
parent_name = ".".join(name_parts[:-chop_idx])
if parent_name in self._handled:
orig = self._handled[parent_name]
if isinstance(orig, types.ModuleType):
orig_name = ".".join(
[orig.__orig_name__] + name_parts[-chop_idx:]
)
else:
orig_name = ".".join([orig] + name_parts[-chop_idx:])
return importlib.util.spec_from_loader(
fullname, _AliasLoader(fullname, orig_name)
)
chop_idx += 1
return None