azure-devops/azext_devops/dev/common/artifacttool_updater.py (144 lines of code) (raw):

# -------------------------------------------------------------------------------------------- # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License.txt in the project root for license information. # -------------------------------------------------------------------------------------------- import io import os import platform import shutil import stat import sys import time import uuid import zipfile import humanfriendly import requests from knack.log import get_logger from knack.util import CLIError import distro from azext_devops.dev.common.services import get_connection from azext_devops.dev.common.config import AZ_DEVOPS_GLOBAL_CONFIG_DIR from azext_devops.dev.common.const import (ARTIFACTTOOL_OVERRIDE_PATH_ENVKEY, ARTIFACTTOOL_OVERRIDE_URL_ENVKEY, ARTIFACTTOOL_OVERRIDE_VERSION_ENVKEY) logger = get_logger(__name__) # pylint: disable=too-few-public-methods class ArtifactToolUpdater: def get_latest_artifacttool(self, organization): artifacttool_binary_override_path = os.environ.get(ARTIFACTTOOL_OVERRIDE_PATH_ENVKEY) if artifacttool_binary_override_path is not None: artifacttool_binary_path = artifacttool_binary_override_path logger.debug("ArtifactTool path was overriden to '%s' due to environment variable %s", artifacttool_binary_path, ARTIFACTTOOL_OVERRIDE_PATH_ENVKEY) else: logger.debug("Checking for a new ArtifactTool") artifacttool_binary_path = self._get_artifacttool(organization) return artifacttool_binary_path def _get_artifacttool(self, organization): # pylint: disable=no-self-use logger.debug("Checking for ArtifactTool updates") # Call the auto-update API to find the current version of ArtifactTool # If AZURE_DEVOPS_EXT_ARTIFACTTOOL_OVERRIDE_URL is set, instead always download from the URL artifacttool_override_url = os.environ.get(ARTIFACTTOOL_OVERRIDE_URL_ENVKEY) if artifacttool_override_url is not None: release_uri = artifacttool_override_url release_id = "custom_{}".format(uuid.uuid4()) logger.debug("ArtifactTool download URL is being overridden to '%s' (ID '%s')", release_uri, release_id) else: override_version = os.environ.get(ARTIFACTTOOL_OVERRIDE_VERSION_ENVKEY) try: release = _get_current_release(organization, override_version) except Exception as ex: logger.debug(ex, exc_info=True) raise CLIError('Failed to update Universal Packages tooling.\n {}'.format(ex)) release_uri, release_id = release # Determine the path for the release, and skip downloading if it already exists logger.debug("Checking if we already have ArtifactTool release '%s'", release_id) release_dir = _compute_release_dir(release_id) if os.path.exists(release_dir): logger.debug("Not updating ArtifactTool because the current release already exists at '%s'", release_dir) return release_dir # Doesn't already exist. Download and extract the release. logger.debug("Updating to ArtifactTool release %s since it doesn't exist at %s", release_id, release_dir) _update_artifacttool(release_uri, release_id) return release_dir def _update_artifacttool(uri, release_id): root = _compute_artifacttool_root() # Remove all existing releases. In the future we may maintain some old versions, # but right now we always delete them. if os.path.isdir(root): for item in os.listdir(root): path = os.path.join(root, item) if os.path.isdir(path): logger.debug("Trying to remove old release %s", item) shutil.rmtree(path, ignore_errors=True) # Failing cleanup is not fatal with humanfriendly.Spinner( # pylint: disable=no-member label="Downloading Universal Packages tooling ({})" .format(release_id), total=100, stream=sys.stderr) as spinner: spinner.step() logger.debug("Downloading ArtifactTool from %s", uri) # Make the request, determine the total size response = requests.get(uri, stream=True) content_length_header = response.headers['Content-Length'].strip() content_length = int(content_length_header) # Do the download, updating the progress bar content = io.BytesIO() bytes_so_far = 0 for chunk in response.iter_content(chunk_size=1024 * 512): if chunk: content.write(chunk) bytes_so_far += len(chunk) spinner.step(100 * float(bytes_so_far) / float(content_length)) # Extract the zip release_temp_dir = os.path.join(root, str(uuid.uuid4())) logger.debug("Extracting ArtifactTool to %s", release_temp_dir) f = zipfile.ZipFile(content) try: _mkdir_if_not_exist(release_temp_dir) f.extractall(path=release_temp_dir) # For Linux, ensure the executable bit is set on the binary "ArtifactTool" if it exists. # Python has a bug https://bugs.python.org/issue15795 where file permissions are not preserved. artifacttool_binary = os.path.join(release_temp_dir, "artifacttool") if os.path.exists(artifacttool_binary): artifacttool_stat = os.stat(artifacttool_binary) os.chmod(artifacttool_binary, artifacttool_stat.st_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH) # Move the release into the real releases location release_dir = _compute_release_dir(release_id) if os.path.exists(release_dir): logger.warning( "The Universal Packages tool already exists at the location %s. Skipping download.", release_dir) else: logger.debug("Moving downloaded ArtifactTool from %s to %s", release_temp_dir, release_dir) # number of times to retry retries = 10 for _ in range(retries - 1): try: os.rename(release_temp_dir, release_dir) break except BaseException as ex: # pylint: disable=broad-except logger.debug( "An error occurred while renaming the Universal Packages tooling: %s. Retrying...", ex) time.sleep(1) else: os.rename(release_temp_dir, release_dir) logger.info("Downloaded Universal Packages tooling successfully") except BaseException as ex: # pylint: disable=broad-except logger.error("An error occurred while extracting the Universal Packages tooling: %s", ex) logger.debug("Removing temporary directory %s", release_temp_dir) shutil.rmtree(release_temp_dir, ignore_errors=True) def _get_current_release(organization, override_version): connection = get_connection(organization) client = connection.get_client('azext_devops.dev.common.client_tool.client_tool_client.ClientToolClient') logger.debug("Looking up current version of ArtifactTool...") # Distro returns empty strings on Windows currently, so don't even send distro_name = distro.id() or None distro_version = distro.version() or None os_name = platform.system() arch = platform.machine() # For M1 macs, there is no version of artifact tool. However, the x86_64 # version can run under Rosetta, so we use that instead. if os_name == "Darwin" and arch in ["amd64", "arm64"]: arch = "x86_64" # Similarly for Windows ARM64 targets there is no version of artifact tool. However, the x86_64 # version can run under emulation, so we use that instead. if os_name == "Windows" and arch == "ARM64": arch = "x86_64" release = client.get_clienttool_release( "ArtifactTool", os_name=os_name, arch=arch, distro_name=distro_name, distro_version=distro_version, version=override_version) return (release.uri, _compute_id(release)) if release is not None else None def _mkdir_if_not_exist(path): try: os.makedirs(path) except OSError: # Ignore errors that were likely because the directory already exists if not os.path.isdir(path): raise def _compute_id(release): return "{}_{}_{}".format(release.name, release.rid, release.version) def _compute_artifacttool_root(): az_devops_cli_root = os.path.join(AZ_DEVOPS_GLOBAL_CONFIG_DIR, 'cli', 'tools') return os.path.join(az_devops_cli_root, "artifacttool") def _compute_release_dir(release_id): return os.path.join(_compute_artifacttool_root(), release_id)