def _get_extension_packages()

in metaflow/extension_support/__init__.py [0:0]


def _get_extension_packages(ignore_info_file=False, restrict_to_directories=None):
    if not _mfext_supported:
        _ext_debug("Not supported for your Python version -- 3.4+ is needed")
        return {}, {}

    # If we have an INFO file with the appropriate information (if running from a saved
    # code package for example), we use that directly
    # Pre-compute on _extension_points
    info_content = read_info_file()
    if not ignore_info_file and info_content:
        all_pkg, ext_to_pkg = info_content.get("ext_info", (None, None))
        if all_pkg is not None and ext_to_pkg is not None:
            _ext_debug("Loading pre-computed information from INFO file")
            # We need to properly convert stuff in ext_to_pkg
            for k, v in ext_to_pkg.items():
                v = [MFExtPackage(*d) for d in v]
                ext_to_pkg[k] = v
            return all_pkg, ext_to_pkg

    # Late import to prevent some circular nastiness
    if restrict_to_directories is None and EXTENSIONS_SEARCH_DIRS != [""]:
        restrict_to_directories = EXTENSIONS_SEARCH_DIRS

    # Check if we even have extensions
    try:
        extensions_module = importlib.import_module(EXT_PKG)
    except ImportError as e:
        if _py_ver >= (3, 6):
            # e.name is set to the name of the package that fails to load
            # so don't error ONLY IF the error is importing this module (but do
            # error if there is a transitive import error)
            if not (isinstance(e, ModuleNotFoundError) and e.name == EXT_PKG):
                raise
        return {}, {}

    if restrict_to_directories:
        restrict_to_directories = [
            Path(p).resolve().as_posix() for p in restrict_to_directories
        ]

    # There are two "types" of packages:
    #   - those installed on the system (distributions)
    #   - those present in the PYTHONPATH
    # We have more information on distributions (including dependencies) and more
    # effective ways to get file information from them (they include the full list of
    # files installed) so we treat them separately from packages purely in PYTHONPATH.
    # They are also the more likely way that users will have extensions present, so
    # we optimize for that case.

    # At this point, we look at all the paths and create a set. As we find distributions
    # that match it, we will remove from the set and then will be left with any
    # PYTHONPATH "packages"
    all_paths = set(Path(p).resolve().as_posix() for p in extensions_module.__path__)
    _ext_debug("Found packages present at %s" % str(all_paths))
    if restrict_to_directories:
        _ext_debug(
            "Processed packages will be restricted to %s" % str(restrict_to_directories)
        )

    list_ext_points = [x.split(".") for x in _extension_points]
    init_ext_points = [x[0] for x in list_ext_points]

    # NOTE: For distribution packages, we will rely on requirements to determine the
    # load order of extensions: if distribution A and B both provide EXT_PKG and
    # distribution A depends on B then when returning modules in `get_modules`, we will
    # first return B and THEN A. We may want
    # other ways of specifying "load me after this if it exists" without depending on
    # the package. One way would be to rely on the description and have that info there.
    # Not sure of the use, though, so maybe we can skip for now.

    # Key: distribution name/package path
    # Value: Dict containing:
    #   root_paths: The root path for all the files in this package. Can be a list in
    #               some rare cases
    #   meta_module: The module to the meta file (if any) that contains information about
    #     how to package this extension (suffixes to include/exclude)
    #   files: The list of files to be included (or considered for inclusion) when
    #     packaging this extension
    mf_ext_packages = dict()

    # Key: extension point (one of _extension_point)
    # Value: another dictionary with
    #   Key: distribution name/full path to package
    #   Value: another dictionary with
    #    Key: Top-level package name (so in metaflow_extensions.X...., the X)
    #    Value: MFExtPackage
    extension_points_to_pkg = defaultdict(dict)

    # Key: string: configuration file for a package
    # Value: list: packages that this configuration file is present in
    config_to_pkg = defaultdict(list)
    # Same as config_to_pkg for meta files
    meta_to_pkg = defaultdict(list)

    # 1st step: look for distributions (the common case)
    for dist in metadata.distributions():
        if any(
            [pkg == EXT_PKG for pkg in (dist.read_text("top_level.txt") or "").split()]
        ):
            # In all cases (whether duplicate package or not), we remove the package
            # from the list of locations to look in.
            # This is not 100% accurate because it is possible that at the same
            # location there is a package and a non-package, but this is extremely
            # unlikely so we are going to ignore this case.
            dist_root = dist.locate_file(EXT_PKG).resolve().as_posix()
            all_paths.discard(dist_root)
            dist_name = dist.metadata["Name"]
            dist_version = dist.metadata["Version"]
            if restrict_to_directories:
                parent_dirs = list(
                    p.as_posix() for p in Path(dist_root).resolve().parents
                )
                if all(p not in parent_dirs for p in restrict_to_directories):
                    _ext_debug(
                        "Ignoring package at %s as it is not in the considered directories"
                        % dist_root
                    )
                    continue
            if dist_name in mf_ext_packages:
                _ext_debug(
                    "Ignoring duplicate package '%s' (duplicate paths in sys.path? (%s))"
                    % (dist_name, str(sys.path))
                )
                continue
            _ext_debug(
                "Found extension package '%s' at '%s'..." % (dist_name, dist_root)
            )

            files_to_include = []
            meta_module = None

            # At this point, we check to see what extension points this package
            # contributes to. This is to enable multiple namespace packages to contribute
            # to the same extension point (for example, you may have multiple packages
            # that have plugins)
            for f in dist.files:
                parts = list(f.parts)

                if len(parts) > 1 and parts[0] == EXT_PKG:
                    # Ensure that we don't have a __init__.py to force this package to
                    # be a NS package
                    if parts[1] == "__init__.py":
                        raise RuntimeError(
                            "Package '%s' providing '%s' is not an implicit namespace "
                            "package as required" % (dist_name, EXT_PKG)
                        )

                    # Record the file as a candidate for inclusion when packaging if
                    # needed
                    if not any(
                        parts[-1].endswith(suffix) for suffix in EXT_EXCLUDE_SUFFIXES
                    ):
                        files_to_include.append(os.path.join(*parts[1:]))

                    if parts[1] in init_ext_points:
                        # This is most likely a problem as we need an intermediate
                        # "identifier"
                        raise RuntimeError(
                            "Package '%s' should conform to '%s.X.%s' and not '%s.%s' where "
                            "X is your organization's name for example"
                            % (
                                dist_name,
                                EXT_PKG,
                                parts[1],
                                EXT_PKG,
                                parts[1],
                            )
                        )

                    # Check for any metadata; we can only have one metadata per
                    # distribution at most
                    if EXT_META_REGEXP.match(parts[1]) is not None:
                        potential_meta_module = ".".join([EXT_PKG, parts[1][:-3]])
                        if meta_module:
                            raise RuntimeError(
                                "Package '%s' defines more than one meta configuration: "
                                "'%s' and '%s' (at least)"
                                % (
                                    dist_name,
                                    meta_module,
                                    potential_meta_module,
                                )
                            )
                        meta_module = potential_meta_module
                        _ext_debug(
                            "Found meta '%s' for '%s'" % (meta_module, dist_full_name)
                        )
                        meta_to_pkg[meta_module].append(dist_full_name)

                if len(parts) > 3 and parts[0] == EXT_PKG:
                    # We go over _extension_points *in order* to make sure we get more
                    # specific paths first

                    # To give useful errors in case multiple top-level packages in
                    # one package
                    dist_full_name = "%s[%s]" % (dist_name, parts[1])
                    for idx, ext_list in enumerate(list_ext_points):
                        if (
                            len(parts) > len(ext_list) + 2
                            and parts[2 : 2 + len(ext_list)] == ext_list
                        ):
                            # Check if this is an "init" file
                            config_module = None

                            if len(parts) == len(ext_list) + 3 and (
                                EXT_CONFIG_REGEXP.match(parts[-1]) is not None
                                or parts[-1] == "__init__.py"
                            ):
                                parts[-1] = parts[-1][:-3]  # Remove the .py
                                config_module = ".".join(parts)

                                config_to_pkg[config_module].append(dist_full_name)
                            cur_pkg = (
                                extension_points_to_pkg[_extension_points[idx]]
                                .setdefault(dist_name, {})
                                .get(parts[1])
                            )
                            if cur_pkg is not None:
                                if (
                                    config_module is not None
                                    and cur_pkg.config_module is not None
                                ):
                                    raise RuntimeError(
                                        "Package '%s' defines more than one "
                                        "configuration file for '%s': '%s' and '%s'"
                                        % (
                                            dist_full_name,
                                            _extension_points[idx],
                                            config_module,
                                            cur_pkg.config_module,
                                        )
                                    )
                                if config_module is not None:
                                    _ext_debug(
                                        "    Top-level '%s' found config file '%s'"
                                        % (parts[1], config_module)
                                    )
                                    extension_points_to_pkg[_extension_points[idx]][
                                        dist_name
                                    ][parts[1]] = MFExtPackage(
                                        package_name=dist_name,
                                        tl_package=parts[1],
                                        config_module=config_module,
                                    )
                            else:
                                _ext_debug(
                                    "    Top-level '%s' extends '%s' with config '%s'"
                                    % (parts[1], _extension_points[idx], config_module)
                                )
                                extension_points_to_pkg[_extension_points[idx]][
                                    dist_name
                                ][parts[1]] = MFExtPackage(
                                    package_name=dist_name,
                                    tl_package=parts[1],
                                    config_module=config_module,
                                )
                            break
            mf_ext_packages[dist_name] = {
                "root_paths": [dist_root],
                "meta_module": meta_module,
                "files": files_to_include,
                "version": dist_version,
            }
    # At this point, we have all the packages that contribute to EXT_PKG,
    # we now check to see if there is an order to respect based on dependencies. We will
    # return an ordered list that respects that order and is ordered alphabetically in
    # case of ties. We do not do any checks because we rely on pip to have done those.
    # Basically topological sort based on dependencies.
    pkg_to_reqs_count = {}
    req_to_dep = {}
    for pkg_name in mf_ext_packages:
        req_count = 0
        req_pkgs = [
            REQ_NAME.match(x).group(1) for x in metadata.requires(pkg_name) or []
        ]
        for req_pkg in req_pkgs:
            if req_pkg in mf_ext_packages:
                req_count += 1
                req_to_dep.setdefault(req_pkg, []).append(pkg_name)
        pkg_to_reqs_count[pkg_name] = req_count

    # Find roots
    mf_pkg_list = []
    to_process = []
    for pkg_name, count in pkg_to_reqs_count.items():
        if count == 0:
            to_process.append(pkg_name)

    # Add them in alphabetical order
    to_process.sort()
    mf_pkg_list.extend(to_process)
    # Find rest topologically
    while to_process:
        next_round = []
        for pkg_name in to_process:
            del pkg_to_reqs_count[pkg_name]
            for dep in req_to_dep.get(pkg_name, []):
                cur_req_count = pkg_to_reqs_count[dep]
                if cur_req_count == 1:
                    next_round.append(dep)
                else:
                    pkg_to_reqs_count[dep] = cur_req_count - 1
        # Add those in alphabetical order
        next_round.sort()
        mf_pkg_list.extend(next_round)
        to_process = next_round

    # Check that we got them all
    if len(pkg_to_reqs_count) > 0:
        raise RuntimeError(
            "Unresolved dependencies in '%s': %s"
            % (EXT_PKG, ", and ".join("'%s'" % p for p in pkg_to_reqs_count))
        )

    _ext_debug("'%s' distributions order is %s" % (EXT_PKG, str(mf_pkg_list)))

    # We check if we have any additional packages that were not yet installed that
    # we need to use. We always put them *last* in the load order and put them
    # alphabetically.
    all_paths_list = list(all_paths)
    all_paths_list.sort()

    # This block of code is the equivalent of the one above for distributions except
    # for PYTHONPATH packages. The functionality is identical, but it looks a little
    # different because we construct the file list instead of having it nicely provided
    # to us.
    package_name_to_path = dict()
    if len(all_paths_list) > 0:
        _ext_debug("Non installed packages present at %s" % str(all_paths))
        for package_count, package_path in enumerate(all_paths_list):
            if restrict_to_directories:
                parent_dirs = list(
                    p.as_posix() for p in Path(package_path).resolve().parents
                )
                if all(p not in parent_dirs for p in restrict_to_directories):
                    _ext_debug(
                        "Ignoring non-installed package at %s as it is not in "
                        "the considered directories" % package_path
                    )
                    continue
            # We give an alternate name for the visible package name. It is
            # not exposed to the end user but used to refer to the package, and it
            # doesn't provide much additional information to have the full path
            # particularly when it is on a remote machine.
            # We keep a temporary mapping around for error messages while loading for
            # the first time.
            package_name = "_pythonpath_%d" % package_count
            _ext_debug(
                "Walking path %s (package name %s)" % (package_path, package_name)
            )
            package_name_to_path[package_name] = package_path
            base_depth = len(package_path.split("/"))
            files_to_include = []
            meta_module = None
            for root, dirs, files in os.walk(package_path):
                parts = root.split("/")
                cur_depth = len(parts)
                # relative_root strips out metaflow_extensions
                relative_root = "/".join(parts[base_depth:])
                relative_module = ".".join(parts[base_depth - 1 :])
                files_to_include.extend(
                    [
                        "/".join([relative_root, f]) if relative_root else f
                        for f in files
                        if not any(
                            [f.endswith(suffix) for suffix in EXT_EXCLUDE_SUFFIXES]
                        )
                    ]
                )
                if cur_depth == base_depth:
                    if "__init__.py" in files:
                        raise RuntimeError(
                            "'%s' at '%s' is not an implicit namespace package as required"
                            % (EXT_PKG, root)
                        )
                    for d in dirs:
                        if d in init_ext_points:
                            raise RuntimeError(
                                "Package at '%s' should conform to' %s.X.%s' and not "
                                "'%s.%s' where X is your organization's name for example"
                                % (root, EXT_PKG, d, EXT_PKG, d)
                            )
                    # Check for meta files for this package
                    meta_files = [
                        x for x in map(EXT_META_REGEXP.match, files) if x is not None
                    ]
                    if meta_files:
                        # We should have one meta file at most
                        if len(meta_files) > 1:
                            raise RuntimeError(
                                "Package at '%s' defines more than one meta file: %s"
                                % (
                                    package_path,
                                    ", and ".join(
                                        ["'%s'" % x.group(0) for x in meta_files]
                                    ),
                                )
                            )
                        else:
                            meta_module = ".".join(
                                [relative_module, meta_files[0].group(0)[:-3]]
                            )

                elif cur_depth > base_depth + 1:
                    # We want at least a top-level name and something under
                    tl_name = parts[base_depth]
                    tl_fullname = "%s[%s]" % (package_path, tl_name)
                    prefix_match = parts[base_depth + 1 :]
                    for idx, ext_list in enumerate(list_ext_points):
                        if prefix_match == ext_list:
                            # We check to see if this is an actual extension point
                            # or if we just have a directory on the way to another
                            # extension point. To do this, we check to see if we have
                            # any files or directories that are *not* directly another
                            # extension point
                            skip_extension = len(files) == 0
                            if skip_extension:
                                next_dir_idx = len(list_ext_points[idx])
                                ok_subdirs = [
                                    list_ext_points[j][next_dir_idx]
                                    for j in range(0, idx)
                                    if len(list_ext_points[j]) > next_dir_idx
                                ]
                                skip_extension = set(dirs).issubset(set(ok_subdirs))

                            if skip_extension:
                                _ext_debug(
                                    "    Skipping '%s' as no files/directory of interest"
                                    % _extension_points[idx]
                                )
                                continue

                            # Check for any "init" files
                            init_files = [
                                x.group(0)
                                for x in map(EXT_CONFIG_REGEXP.match, files)
                                if x is not None
                            ]
                            if "__init__.py" in files:
                                init_files.append("__init__.py")

                            config_module = None
                            if len(init_files) > 1:
                                raise RuntimeError(
                                    "Package at '%s' defines more than one configuration "
                                    "file for '%s': %s"
                                    % (
                                        tl_fullname,
                                        ".".join(prefix_match),
                                        ", and ".join(["'%s'" % x for x in init_files]),
                                    )
                                )
                            elif len(init_files) == 1:
                                config_module = ".".join(
                                    [relative_module, init_files[0][:-3]]
                                )
                                config_to_pkg[config_module].append(tl_fullname)

                            d = extension_points_to_pkg[_extension_points[idx]][
                                package_name
                            ] = dict()
                            d[tl_name] = MFExtPackage(
                                package_name=package_name,
                                tl_package=tl_name,
                                config_module=config_module,
                            )
                            _ext_debug(
                                "    Extends '%s' with config '%s'"
                                % (_extension_points[idx], config_module)
                            )
            mf_pkg_list.append(package_name)
            mf_ext_packages[package_name] = {
                "root_paths": [package_path],
                "meta_module": meta_module,
                "files": files_to_include,
                "version": "_local_",
            }

    # Sanity check that we only have one package per configuration file.
    # This prevents multiple packages from providing the same named configuration
    # file which would result in one overwriting the other if they are both installed.
    errors = []
    for m, packages in config_to_pkg.items():
        if len(packages) > 1:
            errors.append(
                "    Packages %s define the same configuration module '%s'"
                % (", and ".join(["'%s'" % p for p in packages]), m)
            )
    for m, packages in meta_to_pkg.items():
        if len(packages) > 1:
            errors.append(
                "    Packages %s define the same meta module '%s'"
                % (", and ".join(["'%s'" % p for p in packages]), m)
            )
    if errors:
        raise RuntimeError(
            "Conflicts in '%s' files:\n%s" % (EXT_PKG, "\n".join(errors))
        )

    extension_points_to_pkg.default_factory = None

    # We have the load order globally; we now figure it out per extension point.
    for k, v in extension_points_to_pkg.items():
        # v is a dict distributionName/packagePath -> (dict tl_name -> MFPackage)
        l = [v[pkg].values() for pkg in mf_pkg_list if pkg in v]
        # In the case of the plugins.cards extension we allow those packages
        # to be ns packages, so we only list the package once (in its first position).
        # In all other cases, we error out if we don't have a configuration file for the
        # package (either a __init__.py of an explicit mfextinit_*.py)
        final_list = []
        null_config_tl_package = set()
        for pkg in chain(*l):
            if pkg.config_module is None:
                if k == "plugins.cards":
                    # This is allowed here but we only keep one
                    if pkg.tl_package in null_config_tl_package:
                        continue
                    null_config_tl_package.add(pkg.tl_package)
                else:
                    package_path = package_name_to_path.get(pkg.package_name)
                    if package_path:
                        package_path = "at '%s'" % package_path
                    else:
                        package_path = "'%s'" % pkg.package_name
                    raise RuntimeError(
                        "Package %s does not define a configuration file for '%s'"
                        % (package_path, k)
                    )
            final_list.append(pkg)
        extension_points_to_pkg[k] = final_list
    return mf_ext_packages, extension_points_to_pkg