azdev/operations/extensions/__init__.py (276 lines of code) (raw):

# ----------------------------------------------------------------------------- # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License.txt in the project root for # license information. # ----------------------------------------------------------------------------- from collections import OrderedDict import json import os import shutil import sys from knack.log import get_logger from knack.prompting import prompt_y_n from knack.util import CLIError from azdev.utilities import ( cmd, py_cmd, pip_cmd, display, get_ext_repo_paths, find_files, get_azure_config, get_azdev_config, require_azure_cli, heading, subheading, EXTENSION_PREFIX) from .version_upgrade import VersionUpgradeMod logger = get_logger(__name__) def add_extension(extensions): ext_paths = get_ext_repo_paths() if not ext_paths or ext_paths == ['_NONE_']: raise CLIError('Extension repo path is empty. Please try `azdev extension repo add` to add an extension repo') all_extensions = find_files(ext_paths, 'setup.py') if extensions == ['*']: paths_to_add = [os.path.dirname(path) for path in all_extensions if 'site-packages' not in path and 'vendored_sdks' not in path] else: paths_to_add = [] for path in all_extensions: folder = os.path.dirname(path) long_name = os.path.basename(folder) if long_name in extensions: paths_to_add.append(folder) extensions.remove(long_name) # raise error if any extension wasn't found if extensions: raise CLIError('extension(s) not found: {}'.format(' '.join(extensions))) for path in paths_to_add: result = pip_cmd('install -e {}'.format(path), "Adding extension '{}'...".format(path)) if result.error: raise result.error # pylint: disable=raising-bad-type def remove_extension(extensions): ext_paths = get_ext_repo_paths() installed_paths = find_files(ext_paths, '*.*-info') paths_to_remove = [] names_to_remove = [] if extensions == ['*']: paths_to_remove = [os.path.dirname(path) for path in installed_paths] names_to_remove = [os.path.basename(os.path.dirname(path)) for path in installed_paths] else: for path in installed_paths: folder = os.path.dirname(path) long_name = os.path.basename(folder) if long_name in extensions: paths_to_remove.append(folder) names_to_remove.append(long_name) extensions.remove(long_name) # raise error if any extension not installed if extensions: raise CLIError('extension(s) not installed: {}'.format(' '.join(extensions))) # removes any links that may have been added to site-packages. for ext in names_to_remove: pip_cmd('uninstall {} -y'.format(ext)) for path in paths_to_remove: for d in os.listdir(path): # delete the egg-info and dist-info folders to make the extension invisible to the CLI and azdev if d.endswith('egg-info') or d.endswith('dist-info'): path_to_remove = os.path.join(path, d) display("Removing '{}'...".format(path_to_remove)) shutil.rmtree(path_to_remove) def _get_installed_dev_extensions(dev_sources): from glob import glob installed = [] def _collect(path, depth=0, max_depth=3): if not os.path.isdir(path) or depth == max_depth or os.path.split(path)[-1].startswith('.'): return pattern = os.path.join(path, '*.egg-info') match = glob(pattern) if match: ext_path = os.path.dirname(match[0]) ext_name = os.path.split(ext_path)[-1] installed.append({'name': ext_name, 'path': ext_path}) else: for item in os.listdir(path): _collect(os.path.join(path, item), depth + 1, max_depth) for source in dev_sources: _collect(source) return installed def list_extensions(): from glob import glob azure_config = get_azure_config() dev_sources = azure_config.get('extension', 'dev_sources', None) dev_sources = dev_sources.split(',') if dev_sources else [] installed = _get_installed_dev_extensions(dev_sources) installed_names = [x['name'] for x in installed] results = [] for ext_path in find_files(dev_sources, 'setup.py'): # skip non-extension packages that may be in the extension folder (for example, from a virtual environment) try: glob_pattern = os.path.join(os.path.split(ext_path)[0], '{}*'.format(EXTENSION_PREFIX)) _ = glob(glob_pattern)[0] except IndexError: continue # ignore anything in site-packages folder if 'site-packages' in ext_path: continue folder = os.path.dirname(ext_path) long_name = os.path.basename(folder) if long_name not in installed_names: results.append({'name': long_name, 'install': '', 'path': folder}) else: results.append({'name': long_name, 'install': 'Installed', 'path': folder}) return results def show_extension(mod_name): ext_paths = get_ext_repo_paths() ext_mod_paths = [os.path.join(ext, "src", mod_name) for ext in ext_paths] mod_install_path = find_files(ext_mod_paths, '*.*-info') if not mod_install_path: raise CLIError('extension not installed using azdev: {}'.format(mod_name)) if len(mod_install_path) > 1: raise CLIError('extension {} duplicated, please specify extension name'.format(mod_name)) mod_install_dir = os.path.dirname(mod_install_path[0]) long_name = os.path.basename(mod_install_dir) assert long_name == mod_name mod_info = { "name": mod_name, "path": mod_install_dir } # extract pkg name from egg-info or dist-info folders logger.debug("Extracting pkg info from %s...", mod_install_path[0]) meta_files = ["PKG-INFO", "METADATA"] pkg_info_path = [os.path.join(mod_install_path[0], meta) for meta in meta_files if os.path.isfile(os.path.join(mod_install_path[0], meta))] from .util import get_pkg_info_from_pkg_metafile for pkg_info_file in pkg_info_path: pkg_info = get_pkg_info_from_pkg_metafile(pkg_info_file) mod_info.update(pkg_info) return mod_info def _get_sha256sum(a_file): import hashlib sha256 = hashlib.sha256() with open(a_file, 'rb') as f: sha256.update(f.read()) return sha256.hexdigest() def add_extension_repo(repos): from azdev.operations.setup import _check_repo az_config = get_azure_config() env_config = get_azdev_config() dev_sources = az_config.get('extension', 'dev_sources', None) dev_sources = dev_sources.split(',') if dev_sources else [] for repo in repos: repo = os.path.abspath(repo) _check_repo(repo) if repo not in dev_sources: dev_sources.append(repo) az_config.set_value('extension', 'dev_sources', ','.join(dev_sources)) env_config.set_value('ext', 'repo_paths', ','.join(dev_sources)) return list_extension_repos() def remove_extension_repo(repos): az_config = get_azure_config() env_config = get_azdev_config() dev_sources = az_config.get('extension', 'dev_sources', None) dev_sources = dev_sources.split(',') if dev_sources else [] for repo in repos: try: dev_sources.remove(os.path.abspath(repo)) except CLIError: logger.warning("Repo '%s' was not found in the list of repositories to search.", os.path.abspath(repo)) az_config.set_value('extension', 'dev_sources', ','.join(dev_sources)) env_config.set_value('ext', 'repo_paths', ','.join(dev_sources)) return list_extension_repos() def list_extension_repos(): az_config = get_azure_config() dev_sources = az_config.get('extension', 'dev_sources', None) return dev_sources.split(',') if dev_sources else dev_sources def update_extension_index(extensions): import re import tempfile from .util import get_ext_metadata, get_whl_from_url ext_repos = get_ext_repo_paths() index_path = next((x for x in find_files(ext_repos, 'index.json') if 'azure-cli-extensions' in x), None) if not index_path: raise CLIError("Unable to find 'index.json' in your extension repos. Have " "you cloned 'azure-cli-extensions' and added it to you repo " "sources with `azdev extension repo add`?") NAME_REGEX = r'.*/([^/]*)-\d+.\d+.\d+' for extension in extensions: # Get the URL extension = extension[extension.index('https'):] # Get extension WHL from URL if not extension.endswith('.whl') or not extension.startswith('https:'): raise CLIError('usage error: only URL to a WHL file currently supported.') # TODO: extend to consider other options ext_path = extension # Extract the extension name try: extension_name = re.findall(NAME_REGEX, ext_path)[0] extension_name = extension_name.replace('_', '-') except IndexError: raise CLIError('unable to parse extension name') # TODO: Update this! extensions_dir = tempfile.mkdtemp() ext_dir = tempfile.mkdtemp(dir=extensions_dir) whl_cache_dir = tempfile.mkdtemp() whl_cache = {} ext_file = get_whl_from_url(ext_path, extension_name, whl_cache_dir, whl_cache) with open(index_path, 'r') as infile: curr_index = json.loads(infile.read()) entry = { 'downloadUrl': ext_path, 'sha256Digest': _get_sha256sum(ext_file), 'filename': ext_path.split('/')[-1], 'metadata': get_ext_metadata(ext_dir, ext_file, extension_name) } if extension_name not in curr_index['extensions'].keys(): logger.info("Adding '%s' to index...", extension_name) curr_index['extensions'][extension_name] = [entry] else: logger.info("Updating '%s' in index...", extension_name) curr_index['extensions'][extension_name].append(entry) # update index and write back to file with open(os.path.join(index_path), 'w') as outfile: outfile.write(json.dumps(curr_index, indent=4, sort_keys=True)) def build_extensions(extensions, dist_dir='dist'): ext_paths = get_ext_repo_paths() if not ext_paths or ext_paths == ['_NONE_']: raise CLIError('Extension repo path is empty. Please try `azdev extension repo add` to add an extension repo') all_extensions = find_files(ext_paths, 'setup.py') paths_to_build = [] for path in all_extensions: folder = os.path.dirname(path) long_name = os.path.basename(folder) if long_name in extensions: paths_to_build.append(folder) extensions.remove(long_name) # raise error if any extension wasn't found if extensions: raise CLIError('extension(s) not found: {}'.format(' '.join(extensions))) original_cwd = os.getcwd() dist_dir = os.path.join(original_cwd, dist_dir) for path in paths_to_build: os.chdir(path) command = 'setup.py bdist_wheel -b bdist -d {}'.format(dist_dir) result = py_cmd(command, "Building extension '{}'...".format(path), is_module=False) if result.error: raise result.error # pylint: disable=raising-bad-type os.chdir(original_cwd) def publish_extensions(extensions, storage_account, storage_account_key, storage_container, dist_dir='dist', update_index=False, yes=False): from azure.multiapi.storage.v2018_11_09.blob import BlockBlobService heading('Publish Extensions') require_azure_cli() # rebuild the extensions subheading('Building WHLs') try: shutil.rmtree(dist_dir) except Exception as ex: # pylint: disable=broad-except logger.debug("Unable to clear folder '%s'. Error: %s", dist_dir, ex) build_extensions(extensions, dist_dir=dist_dir) whl_files = find_files(dist_dir, '*.whl') uploaded_urls = [] subheading('Uploading WHLs') for whl_path in whl_files: whl_file = os.path.split(whl_path)[-1] client = BlockBlobService(account_name=storage_account, account_key=storage_account_key) exists = client.exists(container_name=storage_container, blob_name=whl_file) # check if extension already exists unless user opted not to if not yes: if exists: if not prompt_y_n( "{} already exists. You may need to bump the extension version. Replace?".format(whl_file), default='n'): logger.warning("Skipping '%s'...", whl_file) continue # upload the WHL file client.create_blob_from_path(container_name=storage_container, blob_name=whl_file, file_path=os.path.abspath(whl_path)) url = client.make_blob_url(container_name=storage_container, blob_name=whl_file) logger.info(url) uploaded_urls.append(url) if update_index: subheading('Updating Index') update_extension_index(uploaded_urls) subheading('Published WHLs') for url in uploaded_urls: display(url) if not update_index: logger.warning('You still need to update the index for your changes!') logger.warning(' az extension update-index <URL>') def cal_next_version(base_meta_file, diff_meta_file, current_version, is_preview=None, is_experimental=None, next_version_pre_tag=None, next_version_segment_tag=None): with open(diff_meta_file, "r") as g: command_tree = json.load(g) module_name = command_tree["module_name"] version_op = VersionUpgradeMod(module_name, current_version, is_preview, is_experimental, base_meta_file, diff_meta_file, next_version_pre_tag, next_version_segment_tag) version_op.update_next_version() return version_op.format_outputs()