def init_environment()

in metaflow/plugins/pypi/conda_environment.py [0:0]


    def init_environment(self, echo, only_steps=None):
        # The implementation optimizes for latency to ensure as many operations can
        # be turned into cheap no-ops as feasible. Otherwise, we focus on maintaining
        # a balance between latency and maintainability of code without re-implementing
        # the internals of Micromamba and Pip.

        # TODO: Introduce verbose logging
        #       https://github.com/Netflix/metaflow/issues/1494

        def environments(type_):
            seen = set()
            for step in self.flow:
                if only_steps and step.name not in only_steps:
                    continue
                environment = self.get_environment(step)
                if type_ in environment and environment["id_"] not in seen:
                    seen.add(environment["id_"])
                    for platform in environment[type_]["platforms"]:
                        yield environment["id_"], {
                            **{
                                k: v
                                for k, v in environment[type_].items()
                                if k != "platforms"
                            },
                            **{"platform": platform},
                        }

        def solve(id_, environment, type_):
            # Cached solve - should be quick!
            platform = environment["platform"]
            return (
                id_,
                (
                    self.read_from_environment_manifest([id_, platform, type_])
                    or self.write_to_environment_manifest(
                        [id_, platform, type_],
                        self.solvers[type_].solve(id_, **environment),
                    )
                ),
                environment["python"],
                platform,
            )

        def cache(storage, results, type_):
            def _path(url, local_path):
                # Special handling for VCS packages
                if url.startswith("git+"):
                    base, _ = os.path.split(urlparse(url).path)
                    _, file = os.path.split(local_path)
                    prefix = url.split("@")[-1]
                    return urlparse(url).netloc + os.path.join(
                        unquote(base), prefix, file
                    )
                else:
                    return urlparse(url).netloc + urlparse(url).path

            local_packages = {
                url: {
                    # Path to package in datastore.
                    "path": _path(
                        url, local_path
                    ),  # urlparse(url).netloc + urlparse(url).path,
                    # Path to package on local disk.
                    "local_path": local_path,
                }
                for result in results
                for url, local_path in self.solvers[type_].metadata(*result).items()
            }
            dirty = set()
            # Prune list of packages to cache.

            _meta = copy.deepcopy(local_packages)
            for id_, packages, _, _ in results:
                for package in packages:
                    if package.get("path"):
                        # Cache only those packages that manifest is unaware of
                        local_packages.pop(package["url"], None)
                    else:
                        package["path"] = _meta[package["url"]]["path"]
                        dirty.add(id_)

            list_of_path_and_filehandle = [
                (
                    package["path"],
                    # Lazily fetch package from the interweb if needed.
                    # TODO: Depending on the len_hint, the package might be downloaded from
                    #       the interweb prematurely. save_bytes needs to be adjusted to handle
                    #       this scenario.
                    LazyOpen(
                        package["local_path"],
                        "rb",
                        url,
                    ),
                )
                for url, package in local_packages.items()
            ]
            storage.save_bytes(
                list_of_path_and_filehandle,
                len_hint=len(list_of_path_and_filehandle),
            )
            for id_, packages, _, platform in results:
                if id_ in dirty:
                    self.write_to_environment_manifest([id_, platform, type_], packages)

        storage = None
        if self.datastore_type not in ["local"]:
            # Initialize storage for caching if using a remote datastore
            storage = self.datastore(_datastore_packageroot(self.datastore, echo))

        self.logger("Bootstrapping virtual environment(s) ...")
        # Sequence of operations:
        #  1. Start all conda solves in parallel
        #  2. Download conda packages sequentially
        #  3. Create and cache conda environments in parallel
        #  4. Start PyPI solves in parallel after each conda environment is created
        #  5. Download PyPI packages sequentially
        #  6. Create and cache PyPI environments in parallel
        with ThreadPoolExecutor() as executor:
            # Start all conda solves in parallel
            conda_futures = [
                executor.submit(lambda x: solve(*x, "conda"), env)
                for env in environments("conda")
            ]

            pypi_envs = {env[0]: env for env in environments("pypi")}
            pypi_futures = []

            # Process conda results sequentially for downloads
            for future in as_completed(conda_futures):
                result = future.result()
                # Sequential conda download
                self.solvers["conda"].download(*result)
                # Parallel conda create and cache
                create_future = executor.submit(self.solvers["conda"].create, *result)
                if storage:
                    executor.submit(cache, storage, [result], "conda")

                # Queue PyPI solve to start after conda create
                if result[0] in pypi_envs:
                    # solve pypi envs uniquely
                    pypi_env = pypi_envs.pop(result[0])

                    def pypi_solve(env):
                        create_future.result()  # Wait for conda create
                        return solve(*env, "pypi")

                    pypi_futures.append(executor.submit(pypi_solve, pypi_env))

            # Process PyPI results sequentially for downloads
            for solve_future in pypi_futures:
                result = solve_future.result()
                # Sequential PyPI download
                self.solvers["pypi"].download(*result)
                # Parallel PyPI create and cache
                executor.submit(self.solvers["pypi"].create, *result)
                if storage:
                    executor.submit(cache, storage, [result], "pypi")
        self.logger("Virtual environment(s) bootstrapped!")