in metaflow/plugins/pypi/conda_environment.py [0:0]
def init_environment(self, echo, only_steps=None):
# The implementation optimizes for latency to ensure as many operations can
# be turned into cheap no-ops as feasible. Otherwise, we focus on maintaining
# a balance between latency and maintainability of code without re-implementing
# the internals of Micromamba and Pip.
# TODO: Introduce verbose logging
# https://github.com/Netflix/metaflow/issues/1494
def environments(type_):
seen = set()
for step in self.flow:
if only_steps and step.name not in only_steps:
continue
environment = self.get_environment(step)
if type_ in environment and environment["id_"] not in seen:
seen.add(environment["id_"])
for platform in environment[type_]["platforms"]:
yield environment["id_"], {
**{
k: v
for k, v in environment[type_].items()
if k != "platforms"
},
**{"platform": platform},
}
def solve(id_, environment, type_):
# Cached solve - should be quick!
platform = environment["platform"]
return (
id_,
(
self.read_from_environment_manifest([id_, platform, type_])
or self.write_to_environment_manifest(
[id_, platform, type_],
self.solvers[type_].solve(id_, **environment),
)
),
environment["python"],
platform,
)
def cache(storage, results, type_):
def _path(url, local_path):
# Special handling for VCS packages
if url.startswith("git+"):
base, _ = os.path.split(urlparse(url).path)
_, file = os.path.split(local_path)
prefix = url.split("@")[-1]
return urlparse(url).netloc + os.path.join(
unquote(base), prefix, file
)
else:
return urlparse(url).netloc + urlparse(url).path
local_packages = {
url: {
# Path to package in datastore.
"path": _path(
url, local_path
), # urlparse(url).netloc + urlparse(url).path,
# Path to package on local disk.
"local_path": local_path,
}
for result in results
for url, local_path in self.solvers[type_].metadata(*result).items()
}
dirty = set()
# Prune list of packages to cache.
_meta = copy.deepcopy(local_packages)
for id_, packages, _, _ in results:
for package in packages:
if package.get("path"):
# Cache only those packages that manifest is unaware of
local_packages.pop(package["url"], None)
else:
package["path"] = _meta[package["url"]]["path"]
dirty.add(id_)
list_of_path_and_filehandle = [
(
package["path"],
# Lazily fetch package from the interweb if needed.
# TODO: Depending on the len_hint, the package might be downloaded from
# the interweb prematurely. save_bytes needs to be adjusted to handle
# this scenario.
LazyOpen(
package["local_path"],
"rb",
url,
),
)
for url, package in local_packages.items()
]
storage.save_bytes(
list_of_path_and_filehandle,
len_hint=len(list_of_path_and_filehandle),
)
for id_, packages, _, platform in results:
if id_ in dirty:
self.write_to_environment_manifest([id_, platform, type_], packages)
storage = None
if self.datastore_type not in ["local"]:
# Initialize storage for caching if using a remote datastore
storage = self.datastore(_datastore_packageroot(self.datastore, echo))
self.logger("Bootstrapping virtual environment(s) ...")
# Sequence of operations:
# 1. Start all conda solves in parallel
# 2. Download conda packages sequentially
# 3. Create and cache conda environments in parallel
# 4. Start PyPI solves in parallel after each conda environment is created
# 5. Download PyPI packages sequentially
# 6. Create and cache PyPI environments in parallel
with ThreadPoolExecutor() as executor:
# Start all conda solves in parallel
conda_futures = [
executor.submit(lambda x: solve(*x, "conda"), env)
for env in environments("conda")
]
pypi_envs = {env[0]: env for env in environments("pypi")}
pypi_futures = []
# Process conda results sequentially for downloads
for future in as_completed(conda_futures):
result = future.result()
# Sequential conda download
self.solvers["conda"].download(*result)
# Parallel conda create and cache
create_future = executor.submit(self.solvers["conda"].create, *result)
if storage:
executor.submit(cache, storage, [result], "conda")
# Queue PyPI solve to start after conda create
if result[0] in pypi_envs:
# solve pypi envs uniquely
pypi_env = pypi_envs.pop(result[0])
def pypi_solve(env):
create_future.result() # Wait for conda create
return solve(*env, "pypi")
pypi_futures.append(executor.submit(pypi_solve, pypi_env))
# Process PyPI results sequentially for downloads
for solve_future in pypi_futures:
result = solve_future.result()
# Sequential PyPI download
self.solvers["pypi"].download(*result)
# Parallel PyPI create and cache
executor.submit(self.solvers["pypi"].create, *result)
if storage:
executor.submit(cache, storage, [result], "pypi")
self.logger("Virtual environment(s) bootstrapped!")