ftl/python/layer_builder.py (335 lines of code) (raw):

# Copyright 2017 Google Inc. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """This package implements the Python package layer builder.""" import logging import os import tempfile import subprocess import concurrent.futures from ftl.common import constants from ftl.common import ftl_util from ftl.common import ftl_error from ftl.common import single_layer_image from ftl.common import tar_to_dockerimage from ftl.python import python_util class PackageLayerBuilder(single_layer_image.CacheableLayerBuilder): def __init__(self, ctx=None, descriptor_files=None, pkg_dir=None, dep_img_lyr=None, cache_key_version=None, cache=None): super(PackageLayerBuilder, self).__init__() self._ctx = ctx self._pkg_dir = pkg_dir self._descriptor_files = descriptor_files self._dep_img_lyr = dep_img_lyr self._cache_key_version = cache_key_version self._cache = cache def GetCacheKeyRaw(self): cache_key = "" return "%s %s" % (cache_key, self._cache_key_version) def BuildLayer(self): with ftl_util.Timing('building_python_pkg_layer'): self._build_layer() if self._cache: with ftl_util.Timing('uploading_python_pkg_layer'): self._cache.Set(self.GetCacheKey(), self.GetImage()) def _build_layer(self): blob, u_blob = ftl_util.zip_dir_to_layer_sha(self._pkg_dir, "") overrides = ftl_util.generate_overrides(False) self._img = tar_to_dockerimage.FromFSImage([blob], [u_blob], overrides) def _log_cache_result(self, hit): if hit: cache_str = constants.PHASE_1_CACHE_HIT else: cache_str = constants.PHASE_1_CACHE_MISS logging.info( cache_str.format( key_version=constants.CACHE_KEY_VERSION, language='PYTHON (package)', key=self.GetCacheKey())) class RequirementsLayerBuilder(single_layer_image.CacheableLayerBuilder): def __init__(self, ctx=None, descriptor_files=None, directory=None, pkg_dir=None, dep_img_lyr=None, cache_key_version=None, wheel_dir=constants.WHEEL_DIR, virtualenv_dir=constants.VIRTUALENV_DIR, python_cmd=[constants.PYTHON_DEFAULT_CMD], pip_cmd=[constants.PIP_DEFAULT_CMD], virtualenv_cmd=[constants.VIRTUALENV_DEFAULT_CMD], venv_cmd=[constants.VENV_DEFAULT_CMD], cache=None): super(RequirementsLayerBuilder, self).__init__() self._ctx = ctx self._pkg_dir = pkg_dir self._wheel_dir = wheel_dir self._virtualenv_dir = virtualenv_dir self._python_cmd = python_cmd self._pip_cmd = pip_cmd self._virtualenv_cmd = virtualenv_cmd self._venv_cmd = venv_cmd self._descriptor_files = descriptor_files self._directory = directory self._dep_img_lyr = dep_img_lyr self._cache_key_version = cache_key_version self._cache = cache def GetCacheKeyRaw(self): descriptor_contents = ftl_util.descriptor_parser( self._descriptor_files, self._ctx) cache_key = '%s %s' % (descriptor_contents, self._dep_img_lyr.GetCacheKeyRaw()) return "%s %s" % (cache_key, self._cache_key_version) def BuildLayer(self): cached_img = None if self._cache: with ftl_util.Timing('checking_cached_requirements.txt_layer'): key = self.GetCacheKey() cached_img = self._cache.Get(key) self._log_cache_result(False if cached_img is None else True) if cached_img: self.SetImage(cached_img) else: python_util.setup_virtualenv(self._virtualenv_dir, self._virtualenv_cmd, self._python_cmd, self._venv_cmd) pkg_descriptor = ftl_util.descriptor_parser( self._descriptor_files, self._ctx) self._pip_download_wheels(pkg_descriptor) whls = self._resolve_whls() pkg_dirs = [self._whl_to_fslayer(whl) for whl in whls] req_txt_imgs = [] with ftl_util.Timing('uploading_all_package_layers'): with concurrent.futures.ThreadPoolExecutor( max_workers=constants.THREADS) as executor: future_to_params = { executor.submit(self._build_pkg, whl_pkg_dir, req_txt_imgs): whl_pkg_dir for whl_pkg_dir in pkg_dirs } for future in concurrent.futures.as_completed( future_to_params): future.result() req_txt_image = ftl_util.AppendLayersIntoImage(req_txt_imgs) if req_txt_image: self.SetImage(req_txt_image) if self._cache: with ftl_util.Timing('uploading_requirements.txt_pkg_lyr'): self._cache.Set(self.GetCacheKey(), self.GetImage()) def _build_pkg(self, whl_pkg_dir, req_txt_imgs): layer_builder = PackageLayerBuilder( ctx=self._ctx, descriptor_files=self._descriptor_files, pkg_dir=whl_pkg_dir, dep_img_lyr=self._dep_img_lyr, cache_key_version=self._cache_key_version, cache=self._cache) layer_builder.BuildLayer() req_txt_imgs.append(layer_builder.GetImage()) def _resolve_whls(self): with ftl_util.Timing('resolving_whl_paths'): return [ os.path.join(self._wheel_dir, f) for f in os.listdir(self._wheel_dir) ] def _whl_to_fslayer(self, whl): tmp_dir = tempfile.mkdtemp() pkg_dir = os.path.join(tmp_dir, self._virtualenv_dir.lstrip('/')) os.makedirs(pkg_dir) pip_cmd_args = list(self._pip_cmd) pip_cmd_args.extend(['install', '--no-deps', '--prefix', pkg_dir, whl]) pip_cmd_args.extend(constants.PIP_OPTIONS) ftl_util.run_command('pip_install_from_wheels', pip_cmd_args, None, self._gen_pip_env()) return tmp_dir def _pip_download_wheels(self, pkg_txt): ftl_util.run_command( 'pip_install_wheel', ['pip', 'install', 'wheel'], cmd_cwd=self._directory, cmd_env=self._gen_pip_env(), cmd_input=pkg_txt, err_type=ftl_error.FTLErrors.USER()) pip_cmd_args = list(self._pip_cmd) pip_cmd_args.extend( ['wheel', '-w', self._wheel_dir, '-r', 'requirements.txt']) pip_cmd_args.extend(constants.PIP_OPTIONS) ftl_util.run_command( 'pip_download_wheels', pip_cmd_args, cmd_cwd=self._directory, cmd_env=self._gen_pip_env(), cmd_input=pkg_txt, err_type=ftl_error.FTLErrors.USER()) def _gen_pip_env(self): pip_env = os.environ.copy() # bazel adds its own PYTHONPATH to the env # which must be removed for the pip calls to work properly pip_env.pop('PYTHONPATH', None) pip_env['VIRTUAL_ENV'] = self._virtualenv_dir pip_env['PATH'] = self._virtualenv_dir + '/bin' + ':' + os.environ[ 'PATH'] return pip_env def _log_cache_result(self, hit): if hit: cache_str = constants.PHASE_1_CACHE_HIT else: cache_str = constants.PHASE_1_CACHE_MISS logging.info( cache_str.format( key_version=constants.CACHE_KEY_VERSION, language='PYTHON (requirements)', key=self.GetCacheKey())) class PipfileLayerBuilder(RequirementsLayerBuilder): def __init__(self, ctx=None, descriptor_files=None, directory=None, pkg_descriptor=None, pkg_dir=None, dep_img_lyr=None, cache_key_version=None, wheel_dir=constants.WHEEL_DIR, virtualenv_dir=constants.VIRTUALENV_DIR, python_cmd=[constants.PYTHON_DEFAULT_CMD], pip_cmd=[constants.PIP_DEFAULT_CMD], virtualenv_cmd=[constants.VIRTUALENV_DEFAULT_CMD], cache=None): super(PipfileLayerBuilder, self).__init__() self._ctx = ctx self._pkg_dir = pkg_dir self._wheel_dir = wheel_dir self._virtualenv_dir = virtualenv_dir self._python_cmd = python_cmd self._pip_cmd = pip_cmd self._virtualenv_cmd = virtualenv_cmd self._descriptor_files = descriptor_files self._directory = directory self._dep_img_lyr = dep_img_lyr self._cache_key_version = cache_key_version self._cache = cache self._pkg_descriptor = pkg_descriptor def GetCacheKeyRaw(self): cache_key = "%s %s %s" % (self._pkg_descriptor[0], self._pkg_descriptor[1], self._dep_img_lyr.GetCacheKeyRaw()) return "%s %s" % (cache_key, self._cache_key_version) def _log_cache_result(self, hit): if hit: cache_str = constants.PHASE_2_CACHE_HIT else: cache_str = constants.PHASE_2_CACHE_MISS logging.info( cache_str.format( key_version=constants.CACHE_KEY_VERSION, language='PYTHON', package_name=self._pkg_descriptor[0], package_version=self._pkg_descriptor[1], key=self.GetCacheKey())) def BuildLayer(self): cached_img = None if self._cache: with ftl_util.Timing('checking_cached_pipfile_pkg_layer'): key = self.GetCacheKey() cached_img = self._cache.Get(key) self._log_cache_result(False if cached_img is None else True) if cached_img: self.SetImage(cached_img) else: self._pip_download_wheels(' '.join(self._pkg_descriptor)) whls = self._resolve_whls() if len(whls) != 1: raise Exception("expected one whl for one installed pkg") pkg_dir = self._whl_to_fslayer(whls[0]) blob, u_blob = ftl_util.zip_dir_to_layer_sha(pkg_dir, "") overrides = ftl_util.generate_overrides(False) self._img = tar_to_dockerimage.FromFSImage([blob], [u_blob], overrides) if self._cache: with ftl_util.Timing('uploading_pipfile_pkg_layer'): self._cache.Set(self.GetCacheKey(), self.GetImage()) def _pip_download_wheels(self, pkg_txt): pip_cmd_args = list(self._pip_cmd) pip_cmd_args.extend( ['wheel', '-w', self._wheel_dir, '-r', '/dev/stdin']) pip_cmd_args.extend(['--no-deps']) pip_cmd_args.extend(constants.PIP_OPTIONS) ftl_util.run_command( 'pip_download_wheels', pip_cmd_args, cmd_cwd=self._directory, cmd_env=self._gen_pip_env(), cmd_input=pkg_txt, err_type=ftl_error.FTLErrors.USER()) class InterpreterLayerBuilder(single_layer_image.CacheableLayerBuilder): def __init__(self, virtualenv_dir=constants.VIRTUALENV_DIR, python_cmd=[constants.PYTHON_DEFAULT_CMD], virtualenv_cmd=[constants.VIRTUALENV_DEFAULT_CMD], venv_cmd=[constants.VENV_DEFAULT_CMD], cache_key_version=None, cache=None): super(InterpreterLayerBuilder, self).__init__() self._virtualenv_dir = virtualenv_dir self._python_cmd = python_cmd self._virtualenv_cmd = virtualenv_cmd self._venv_cmd = venv_cmd self._cache_key_version = cache_key_version self._cache = cache def GetCacheKeyRaw(self): cache_key = '%s %s %s' % (self._python_version(), self._virtualenv_cmd, self._virtualenv_dir) return "%s %s" % (cache_key, self._cache_key_version) def _python_version(self): with ftl_util.Timing('check python version'): python_version_cmd = list(self._python_cmd) python_version_cmd.append('--version') logging.info("`python version` full cmd:\n%s" % " ".join(python_version_cmd)) proc_pipe = subprocess.Popen( python_version_cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) stdout, stderr = proc_pipe.communicate() logging.info("`python version` stderr:\n%s" % stderr) if proc_pipe.returncode: raise Exception("error: `python version` returned code: %d" % proc_pipe.returncode) # up until Python 3.4 the version info gets written to stderr return stdout if len(stdout) >= len(stderr) else stderr def BuildLayer(self): cached_img = None if self._cache: with ftl_util.Timing('checking_cached_interpreter_layer'): key = self.GetCacheKey() cached_img = self._cache.Get(key) self._log_cache_result(False if cached_img is None else True) if cached_img: self.SetImage(cached_img) else: with ftl_util.Timing('building_interpreter_layer'): self._build_layer() if self._cache: with ftl_util.Timing('uploading_interpreter_layer'): self._cache.Set(self.GetCacheKey(), self.GetImage()) def _build_layer(self): python_util.setup_virtualenv(self._virtualenv_dir, self._virtualenv_cmd, self._python_cmd, self._venv_cmd) blob, u_blob = ftl_util.zip_dir_to_layer_sha(self._virtualenv_dir, self._virtualenv_dir) overrides = ftl_util.generate_overrides(True, self._virtualenv_dir) self._img = tar_to_dockerimage.FromFSImage([blob], [u_blob], overrides) def _log_cache_result(self, hit): if hit: cache_str = constants.PHASE_1_CACHE_HIT else: cache_str = constants.PHASE_1_CACHE_MISS logging.info( cache_str.format( key_version=constants.CACHE_KEY_VERSION, language='PYTHON (interpreter)', key=self.GetCacheKey()))