def find_spec()

in metaflow/extension_support/__init__.py [0:0]


    def find_spec(self, fullname, path, target=None):
        # If we are trying to load a shadowed module (ending in ._orig), we don't
        # say we handle it
        # _ext_debug(
        #    "Looking for %s in %s with target %s" % (fullname, str(path), target)
        # )
        if any([fullname.startswith(e) for e in self._temp_excluded_prefix]):
            return None

        # If this is something we directly handle, return our loader
        if fullname in self._handled:
            return importlib.util.spec_from_loader(
                fullname, _AliasLoader(fullname, self._handled[fullname])
            )

        # For the first pass when we try to load a shadowed module, we send it back
        # without the ._orig and that will find the original spec of the module
        # Note that we handle mymodule._orig.orig_submodule as well as mymodule._orig.
        # Basically, the original module and any of the original submodules are
        # available under _orig.
        name_parts = fullname.split(".")
        try:
            orig_idx = name_parts.index("_orig")
        except ValueError:
            orig_idx = -1
        if orig_idx > -1 and ".".join(name_parts[:orig_idx]) in self._handled:
            orig_name = ".".join(name_parts[:orig_idx] + name_parts[orig_idx + 1 :])
            parent_name = None
            if orig_idx != len(name_parts) - 1:
                # We have a parent module under the _orig portion so for example, if
                # we load mymodule._orig.orig_submodule, our parent is mymodule._orig.
                # However, since mymodule is currently shadowed, we need to reset
                # the parent module properly. We know it is already loaded (since modules
                # are loaded hierarchically)
                parent_name = ".".join(
                    name_parts[:orig_idx] + name_parts[orig_idx + 1 : -1]
                )
            _ext_debug("Looking for original module '%s'" % orig_name)
            prefix = ".".join(name_parts[:orig_idx])
            self._temp_excluded_prefix.add(prefix)
            # We also have to remove the module temporarily while we look for the
            # new spec since otherwise it returns the spec of that loaded module.
            # module is also restored *after* we call `create_module` in the loader
            # otherwise it just returns None. We also swap out the parent module so that
            # the search can start from there.
            loaded_module = sys.modules.get(orig_name)
            if loaded_module:
                del sys.modules[orig_name]
            parent_module = sys.modules.get(parent_name) if parent_name else None
            if parent_module:
                sys.modules[parent_name] = sys.modules[".".join([parent_name, "_orig"])]

            # This finds the spec that would have existed had we not added all our
            # _LazyFinders
            spec = importlib.util.find_spec(orig_name)

            self._temp_excluded_prefix.remove(prefix)

            if not spec:
                return None

            if spec.submodule_search_locations:
                self._orig_search_paths.update(spec.submodule_search_locations)

            _ext_debug("Found original spec %s" % spec)

            # Change the spec
            spec.loader = _OrigLoader(
                fullname,
                spec.loader,
                loaded_module,
                parent_module,
            )

            return spec

        for p in path or []:
            if p in self._orig_search_paths:
                # We need to look in some of the "_orig" modules
                orig_override_name = ".".join(
                    name_parts[:-1] + ["_orig", name_parts[-1]]
                )
                _ext_debug(
                    "Looking for %s as an original module: searching for %s"
                    % (fullname, orig_override_name)
                )
                return importlib.util.find_spec(orig_override_name)
        if len(name_parts) > 1:
            # This checks for submodules of things we handle. We check for the most
            # specific submodule match and use that
            chop_idx = 1
            while chop_idx < len(name_parts):
                parent_name = ".".join(name_parts[:-chop_idx])
                if parent_name in self._handled:
                    orig = self._handled[parent_name]
                    if isinstance(orig, types.ModuleType):
                        orig_name = ".".join(
                            [orig.__orig_name__] + name_parts[-chop_idx:]
                        )
                    else:
                        orig_name = ".".join([orig] + name_parts[-chop_idx:])
                    return importlib.util.spec_from_loader(
                        fullname, _AliasLoader(fullname, orig_name)
                    )
                chop_idx += 1
        return None