metaflow/package.py (142 lines of code) (raw):

import importlib import os import sys import tarfile import time import json from io import BytesIO from .user_configs.config_parameters import CONFIG_FILE, dump_config_values from .extension_support import EXT_PKG, package_mfext_all from .metaflow_config import DEFAULT_PACKAGE_SUFFIXES from .exception import MetaflowException from .util import to_unicode from . import R from .info_file import INFO_FILE DEFAULT_SUFFIXES_LIST = DEFAULT_PACKAGE_SUFFIXES.split(",") METAFLOW_SUFFIXES_LIST = [".py", ".html", ".css", ".js"] class NonUniqueFileNameToFilePathMappingException(MetaflowException): headline = "Non Unique file path for a file name included in code package" def __init__(self, filename, file_paths, lineno=None): msg = ( "Filename %s included in the code package includes multiple different paths for the same name : %s.\n" "The `filename` in the `add_to_package` decorator hook requires a unique `file_path` to `file_name` mapping" % (filename, ", ".join(file_paths)) ) super().__init__(msg=msg, lineno=lineno) # this is os.walk(follow_symlinks=True) with cycle detection def walk_without_cycles(top_root): seen = set() def _recurse(root): for parent, dirs, files in os.walk(root): for d in dirs: path = os.path.join(parent, d) if os.path.islink(path): # Breaking loops: never follow the same symlink twice # # NOTE: this also means that links to sibling links are # not followed. In this case: # # x -> y # y -> oo # oo/real_file # # real_file is only included twice, not three times reallink = os.path.realpath(path) if reallink not in seen: seen.add(reallink) for x in _recurse(path): yield x yield parent, files for x in _recurse(top_root): yield x class MetaflowPackage(object): def __init__(self, flow, environment, echo, suffixes=DEFAULT_SUFFIXES_LIST): self.suffixes = list(set().union(suffixes, DEFAULT_SUFFIXES_LIST)) self.environment = environment self.metaflow_root = os.path.dirname(__file__) self.flow_name = flow.name self._flow = flow self.create_time = time.time() environment.init_environment(echo) for step in flow: for deco in step.decorators: deco.package_init(flow, step.__name__, environment) self.blob = self._make() def _walk(self, root, exclude_hidden=True, suffixes=None): if suffixes is None: suffixes = [] root = to_unicode(root) # handle files/folder with non ascii chars prefixlen = len("%s/" % os.path.dirname(root)) for ( path, files, ) in walk_without_cycles(root): if exclude_hidden and "/." in path: continue # path = path[2:] # strip the ./ prefix # if path and (path[0] == '.' or './' in path): # continue for fname in files: if (fname[0] == "." and fname in suffixes) or ( fname[0] != "." and any(fname.endswith(suffix) for suffix in suffixes) ): p = os.path.join(path, fname) yield p, p[prefixlen:] def path_tuples(self): """ Returns list of (path, arcname) to be added to the job package, where `arcname` is the alternative name for the file in the package. """ # We want the following contents in the tarball # Metaflow package itself for path_tuple in self._walk( self.metaflow_root, exclude_hidden=False, suffixes=METAFLOW_SUFFIXES_LIST ): yield path_tuple # Metaflow extensions; for now, we package *all* extensions but this may change # at a later date; it is possible to call `package_mfext_package` instead of # `package_mfext_all` but in that case, make sure to also add a # metaflow_extensions/__init__.py file to properly "close" the metaflow_extensions # package and prevent other extensions from being loaded that may be # present in the rest of the system for path_tuple in package_mfext_all(): yield path_tuple # Any custom packages exposed via decorators deco_module_paths = {} for step in self._flow: for deco in step.decorators: for path_tuple in deco.add_to_package(): file_path, file_name = path_tuple # Check if the path is not duplicated as # many steps can have the same packages being imported if file_name not in deco_module_paths: deco_module_paths[file_name] = file_path yield path_tuple elif deco_module_paths[file_name] != file_path: raise NonUniqueFileNameToFilePathMappingException( file_name, [deco_module_paths[file_name], file_path] ) # the package folders for environment for path_tuple in self.environment.add_to_package(): yield path_tuple if R.use_r(): # the R working directory for path_tuple in self._walk( "%s/" % R.working_dir(), suffixes=self.suffixes ): yield path_tuple # the R package for path_tuple in R.package_paths(): yield path_tuple else: # the user's working directory flowdir = os.path.dirname(os.path.abspath(sys.argv[0])) + "/" for path_tuple in self._walk(flowdir, suffixes=self.suffixes): yield path_tuple def _add_configs(self, tar): buf = BytesIO() buf.write(json.dumps(dump_config_values(self._flow)).encode("utf-8")) self._add_file(tar, os.path.basename(CONFIG_FILE), buf) def _add_info(self, tar): buf = BytesIO() buf.write( json.dumps( self.environment.get_environment_info(include_ext_info=True) ).encode("utf-8") ) self._add_file(tar, os.path.basename(INFO_FILE), buf) @staticmethod def _add_file(tar, filename, buf): info = tarfile.TarInfo(filename) buf.seek(0) info.size = len(buf.getvalue()) # Setting this default to Dec 3, 2019 info.mtime = 1575360000 tar.addfile(info, buf) def _make(self): def no_mtime(tarinfo): # a modification time change should not change the hash of # the package. Only content modifications will. # Setting this default to Dec 3, 2019 tarinfo.mtime = 1575360000 return tarinfo buf = BytesIO() with tarfile.open( fileobj=buf, mode="w:gz", compresslevel=3, dereference=True ) as tar: self._add_info(tar) self._add_configs(tar) for path, arcname in self.path_tuples(): tar.add(path, arcname=arcname, recursive=False, filter=no_mtime) blob = bytearray(buf.getvalue()) blob[4:8] = [0] * 4 # Reset 4 bytes from offset 4 to account for ts return blob def __str__(self): return "<code package for flow %s (created @ %s)>" % ( self.flow_name, time.strftime("%a, %d %b %Y %H:%M:%S", self.create_time), )