sagemaker_image_builder/main.py (353 lines of code) (raw):

import argparse import base64 import copy import glob import json import os import re import shutil import subprocess import boto3 import docker from conda.models.match_spec import MatchSpec from docker.errors import ContainerError from semver import Version from sagemaker_image_builder.changelog_generator import generate_change_log from sagemaker_image_builder.dependency_upgrader import ( _MAJOR, _MINOR, _PATCH, _get_dependency_upper_bound_for_runtime_upgrade, ) from sagemaker_image_builder.package_report import ( generate_package_size_report, generate_package_staleness_report, ) from sagemaker_image_builder.release_notes_generator import generate_release_notes from sagemaker_image_builder.utils import ( dump_conda_package_metadata, get_dir_for_version, get_match_specs, get_semver, is_exists_dir_for_version, ) _docker_client = docker.from_env() def create_and_get_semver_dir(version: Version, image_config: list[dict], exist_ok: bool = False): dir = get_dir_for_version(version) if os.path.exists(dir): if not exist_ok: raise Exception() if not os.path.isdir(dir): raise Exception() # Delete all files except the additional_packages_env_in_file _delete_all_files_except_additional_packages_input_files(dir, image_config) else: os.makedirs(dir) return dir def _delete_all_files_except_additional_packages_input_files(base_version_dir, image_config): additional_package_env_in_files = [ image_generator_config["additional_packages_env_in_file"] for image_generator_config in image_config ] for filename in os.listdir(base_version_dir): if filename not in additional_package_env_in_files: file_path = os.path.join(base_version_dir, filename) try: if os.path.isfile(file_path) or os.path.islink(file_path): os.unlink(file_path) elif os.path.isdir(file_path): shutil.rmtree(file_path) except Exception as e: print("Failed to delete %s. Reason: %s" % (file_path, e)) def _create_new_version_artifacts(args): with open(args.image_config_file) as jsonfile: image_config = json.load(jsonfile) runtime_version_upgrade_type = args.runtime_version_upgrade_type if runtime_version_upgrade_type == _PATCH: runtime_version_upgrade_func = "bump_patch" elif runtime_version_upgrade_type == _MINOR: runtime_version_upgrade_func = "bump_minor" elif runtime_version_upgrade_type == _MAJOR: runtime_version_upgrade_func = "bump_major" else: raise Exception() base_patch_version = get_semver(args.base_patch_version) if base_patch_version.prerelease and args.pre_release_identifier: # We support creating new patch/major/minor versions from a pre-release version. # But We don't support passing the pre_release_identifier parameter while creating a new # patch/major/minor versions from the pre-release version. raise Exception() next_version = _get_next_version(base_patch_version, runtime_version_upgrade_func) if args.pre_release_identifier: next_version = next_version.replace(prerelease=args.pre_release_identifier) base_version_dir = get_dir_for_version(base_patch_version) new_version_dir = create_and_get_semver_dir(next_version, image_config, args.force) for image_generator_config in image_config: _create_new_version_conda_specs( base_version_dir, new_version_dir, runtime_version_upgrade_type, image_generator_config ) _copy_static_files(base_version_dir, new_version_dir, str(next_version.major), runtime_version_upgrade_type) with open(f"{new_version_dir}/source-version.txt", "w") as f: f.write(args.base_patch_version) def _copy_static_files(base_version_dir, new_version_dir, new_version_major, runtime_version_upgrade_type): for f in glob.glob(f"{base_version_dir}/gpu.arg_based_env.in"): shutil.copy2(f, new_version_dir) for f in glob.glob(f"{base_version_dir}/patch_*"): shutil.copy2(f, new_version_dir) # For patches, get Dockerfile+dirs from base patch # For minor/major, get Dockerfile+dirs from template if runtime_version_upgrade_type == _PATCH: base_path = base_version_dir else: base_path = f"template/v{new_version_major}" for f in glob.glob(os.path.relpath(f"{base_path}/Dockerfile")): shutil.copy2(f, new_version_dir) if int(new_version_major) >= 1: # dirs directory doesn't exist for v0. It was introduced only for v1 dirs_relative_path = os.path.relpath(f"{base_path}/dirs") for f in glob.glob(dirs_relative_path): shutil.copytree(f, os.path.join(new_version_dir, "dirs")) def _create_new_version_conda_specs( base_version_dir, new_version_dir, runtime_version_upgrade_type, image_generator_config ): env_in_filename = image_generator_config["build_args"]["ENV_IN_FILENAME"] additional_packages_env_in_filename = image_generator_config["additional_packages_env_in_file"] env_out_filename = image_generator_config["env_out_filename"] base_match_specs_in = get_match_specs(f"{base_version_dir}/{env_in_filename}") base_match_specs_out = get_match_specs(f"{base_version_dir}/{env_out_filename}") additional_packages_match_specs_in = get_match_specs(f"{new_version_dir}/{additional_packages_env_in_filename}") # Add all the match specs from the previous version. # If a package is present in both additional packages as well as the previous version, then # use the values in the previous version additional_packages_match_specs_in.update(base_match_specs_in) out = [] for package_name in additional_packages_match_specs_in: match_out: MatchSpec = base_match_specs_out.get(package_name) if match_out is None: # No restriction on what versions to use. out.append(f"conda-forge::{package_name}") else: channel = match_out.get("channel").channel_name min_version_inclusive = match_out.get("version") assert str(min_version_inclusive).startswith("==") min_version_inclusive = str(min_version_inclusive).removeprefix("==") max_version_str = _get_dependency_upper_bound_for_runtime_upgrade( package_name, min_version_inclusive, runtime_version_upgrade_type ) out.append(f"{channel}::{package_name}[version='>={min_version_inclusive}{max_version_str}']") with open(f"{new_version_dir}/{env_in_filename}", "w") as f: f.write("# This file is auto-generated.\n") f.write("\n".join(out)) f.write("\n") # This new line is pretty important. See code documentation in Dockerfile for the reasoning. def create_major_version_artifacts(args): _create_new_version_artifacts(args) def create_minor_version_artifacts(args): _create_new_version_artifacts(args) def create_patch_version_artifacts(args): _create_new_version_artifacts(args) def build_images(args): with open(args.image_config_file) as jsonfile: image_config = json.load(jsonfile) target_version = get_semver(args.target_patch_version) image_ids, image_versions = _build_local_images(target_version, args.target_ecr_repo, image_config, args.force) generate_release_notes(target_version, image_config) # Upload to ECR before running tests so that only the exact image which we tested goes to public # TODO: Move after tests are stabilized if args.target_ecr_repo is not None: _push_images_upstream(image_versions, args.region) def _push_images_upstream(image_versions_to_push: list[dict[str, str]], region: str): print(f"Will now push the images to ECR: {image_versions_to_push}") for i in image_versions_to_push: username, password = _get_ecr_credentials(region, i["repository"]) _docker_client.images.push( repository=i["repository"], tag=i["tag"], auth_config={"username": username, "password": password} ) print(f"Successfully pushed these images to ECR: {image_versions_to_push}") def _get_config_for_image(target_version_dir: str, image_generator_config, force_rebuild) -> dict: if not os.path.exists(target_version_dir + "/" + image_generator_config["env_out_filename"]) or force_rebuild: return image_generator_config config_for_image = copy.deepcopy(image_generator_config) # Use the existing env.out to create the conda environment. Pass that as env.in config_for_image["build_args"]["ENV_IN_FILENAME"] = image_generator_config["env_out_filename"] # Remove ARG_BASED_ENV_IN_FILENAME if it exists config_for_image["build_args"].pop("ARG_BASED_ENV_IN_FILENAME", None) return config_for_image # Returns a tuple of: 1/ list of actual images generated; 2/ list of tagged images. A given image can be tagged by # multiple different strings - for e.g., a CPU image can be tagged as '1.3.2-cpu', '1.3-cpu', '1-cpu' and/or # 'latest-cpu'. Therefore, (1) is strictly a subset of (2). def _build_local_images( target_version: Version, target_ecr_repo_list: list[str], image_config: list[dict], force: bool ) -> (list[str], list[dict[str, str]]): target_version_dir = get_dir_for_version(target_version) generated_image_ids = [] generated_image_versions = [] for image_generator_config in image_config: config = _get_config_for_image(target_version_dir, image_generator_config, force) # BuildKit must be used for volume mounting during build, but isn't supported by docker-py (https://github.com/docker/docker-py/issues/2230) # So instead we enable via env variable, and then call docker build via cli. os.environ["DOCKER_BUILDKIT"] = "1" raw_build_result = "" # Docker build takes args like `--build-arg Key1=Value1 --build-arg Key2=Value2` build_arg_options = sum([["--build-arg", f"{k}={v}"] for k, v in config["build_args"].items()], []) docker_build_command = ["docker", "build", "--rm", "--pull"] + build_arg_options + [f"./{target_version_dir}"] try: raw_build_result = subprocess.check_output( docker_build_command, stderr=subprocess.STDOUT, universal_newlines=True ) except subprocess.CalledProcessError as e: print(f"Build failed with exit code {e.returncode}. Output:") # Prints output from Docker build print(e.output) raise # Parse the output image_id = None for line in raw_build_result.splitlines(): if line.startswith("#"): # Image id format in Docker build output if "writing image sha256:" in line.lower(): match = re.search(r"sha256:([a-f0-9]+)", line) if match: image_id = match.group(1) # Now we can get image using docker-py image = _docker_client.images.get(image_id) print(f"Successfully built an image with id: {image.id}") generated_image_ids.append(image.id) try: container_logs = _docker_client.containers.run( image=image.id, detach=False, auto_remove=True, command="conda list --explicit" ) except ContainerError as e: print(e.container.logs().decode("utf-8")) # After printing the logs, raise the exception (which is the old behavior) raise with open(f'{target_version_dir}/{config["env_out_filename"]}', "wb") as f: f.write(container_logs) # Generate change logs. Use the original image generator config which contains the name # of the actual env.in file instead of the 'config'. generate_change_log(target_version, image_generator_config) image_tag_suffix = config["image_tag_suffix"] if "image_tag_suffix" in config else "" image_tags_to_apply = [ f"{i}{image_tag_suffix}" for i in _get_version_tags(target_version, config["env_out_filename"]) ] if target_ecr_repo_list is not None: for target_ecr_repo in target_ecr_repo_list: for t in image_tags_to_apply: image.tag(target_ecr_repo, tag=t) generated_image_versions.append({"repository": target_ecr_repo, "tag": t}) # Tag the image for testing image.tag(f"localhost/{config["image_name"]}", f"{str(target_version)}{image_tag_suffix}") return generated_image_ids, generated_image_versions def _get_next_version(current_version: Version, upgrade_func: str) -> Version: next_version = getattr(current_version, upgrade_func)() if current_version.prerelease: # Semver Ignores prerelease identifier when we do bump_{patch/minor/major} next_version = next_version.replace(prerelease=current_version.prerelease) return next_version # At some point of time, let's say some patch versions exist for both 2.6 and 2.7, and we create new patch # versions for both of them. Now, for the new 2.6.x, we can tag it as '2.6.x-cpu' and '2.6-cpu' but NOT '2-cpu' because # there is a more recent version (i.e. 2.7.x) that should be considered '2-cpu'. So, given a patch version, the # following function returns a list of versions for which the current patch version is latest for. # For versions with pre-release identifier, this method will return the appropriate tags # Example: For an version 2.0.0-beta, this method will return [2.0.0-beta, 2.0-beta, 2-beta, # latest-beta] def _get_version_tags(target_version: Version, env_out_file_name: str) -> list[str]: # First, add '2.6.x' as is. res = [str(target_version)] prerelease_version_suffix = f"-{target_version.prerelease}" if target_version.prerelease else "" # If we were to add '2.6', check if '2.6.(x+1)' is present. if not is_exists_dir_for_version(_get_next_version(target_version, "bump_patch"), env_out_file_name): res.append(f"{target_version.major}.{target_version.minor}{prerelease_version_suffix}") else: return res # If we were to add '2', check if '2.7' is present. if not is_exists_dir_for_version(_get_next_version(target_version, "bump_minor"), env_out_file_name): res.append(f"{target_version.major}{prerelease_version_suffix}") else: return res # If we were to add 'latest', check if '3.0.0' is present. if not is_exists_dir_for_version(_get_next_version(target_version, "bump_major"), env_out_file_name): res.append(f"latest{prerelease_version_suffix}") return res def _get_ecr_credentials(region, repository: str) -> (str, str): _ecr_client_config_name = "ecr-public" if repository.startswith("public.ecr.aws") else "ecr" _ecr_client = boto3.client(_ecr_client_config_name, region_name=region) _authorization_data = _ecr_client.get_authorization_token()["authorizationData"] if _ecr_client_config_name == "ecr": # If we are using the ecr private client, then fetch the first index from authorizationData _authorization_data = _authorization_data[0] return base64.b64decode(_authorization_data["authorizationToken"]).decode().split(":") def get_arg_parser(): parser = argparse.ArgumentParser(description="A command line utility to create new image versions.") subparsers = parser.add_subparsers(dest="subcommand") create_major_version_parser = subparsers.add_parser( "create-major-version-artifacts", help="Creates a new image major version." ) create_major_version_parser.set_defaults(func=create_major_version_artifacts, runtime_version_upgrade_type=_MAJOR) create_minor_version_parser = subparsers.add_parser( "create-minor-version-artifacts", help="Creates a new image minor version." ) create_minor_version_parser.set_defaults(func=create_minor_version_artifacts, runtime_version_upgrade_type=_MINOR) create_patch_version_parser = subparsers.add_parser( "create-patch-version-artifacts", help="Creates a new image patch version." ) create_patch_version_parser.set_defaults(func=create_patch_version_artifacts, runtime_version_upgrade_type=_PATCH) # Common arguments for p in [create_major_version_parser, create_minor_version_parser, create_patch_version_parser]: p.add_argument( "--base-patch-version", required=True, help="Specify the base patch version from which a new version should be created.", ) p.add_argument( "--image-config-file", required=True, help="A json file contains the docker image generator configuration.", ) p.add_argument( "--pre-release-identifier", help="Optionally specify the pre-release identifier for this new version that should be created.", ) p.add_argument( "--force", action="store_true", help="Overwrites any existing directory corresponding to the new version that will be generated.", ) build_image_parser = subparsers.add_parser("build", help="Builds a new image from the Dockerfile.") build_image_parser.add_argument( "--target-patch-version", required=True, help="Specify the target version of image needs to be built.", ) build_image_parser.add_argument( "--image-config-file", required=True, help="A json file containing the docker image generator configuration.", ) build_image_parser.add_argument( "--force", action="store_true", help="Builds a new docker image which will fetch the latest versions of each package in " "the conda environment. Any existing env.out file will be overwritten.", ) build_image_parser.add_argument( "--target-ecr-repo", action="append", help="Specify the AWS ECR repository in which this image needs to be uploaded.", ) build_image_parser.add_argument("--region", help="Specify the region of the ECR repository.") build_image_parser.set_defaults(func=build_images) package_staleness_parser = subparsers.add_parser( "generate-staleness-report", help="Generates package staleness report for each of the marquee packages in the given image version.", ) package_staleness_parser.set_defaults(func=generate_package_staleness_report) package_staleness_parser.add_argument( "--image-config-file", required=True, help="A json file contains the docker image generator configuration.", ) package_staleness_parser.add_argument( "--target-patch-version", required=True, help="Specify the base patch version for which the package staleness report needs to be generated.", ) package_size_parser = subparsers.add_parser( "generate-size-report", help="Generates package size report for each of the packages in the given image version.", ) package_size_parser.set_defaults(func=generate_package_size_report) package_size_parser.add_argument( "--image-config-file", required=True, help="A json file contains the docker image generator configuration.", ) package_size_parser.add_argument( "--base-patch-version", required=False, help="Specify the base patch version for which the package size report needs to be generated.", ) package_size_parser.add_argument( "--target-patch-version", required=True, help="Specify the target patch version for which the package size report needs to be generated.", ) package_size_parser.add_argument( "--validate", action="store_true", help="Validate package size delta and raise error if the validation failed.", ) conda_package_metadata_parser = subparsers.add_parser( "get-conda-package-metadata", help="Collect and dump conda package versions and sizes in current activated conda environment.", ) conda_package_metadata_parser.set_defaults(func=dump_conda_package_metadata) conda_package_metadata_parser.add_argument( "-H", "--human-readable", action="store_true", help="Print human-readable size information" ) return parser def parse_args(parser): args = parser.parse_args() if args.subcommand is None: parser.print_help() else: args.func(args) def main(): parse_args(get_arg_parser()) if __name__ == "__main__": main()