azdev/utilities/path.py (184 lines of code) (raw):
# -----------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# -----------------------------------------------------------------------------
import logging
import os
from glob import glob
from knack.util import CLIError
from .const import COMMAND_MODULE_PREFIX, EXTENSION_PREFIX, ENV_VAR_VIRTUAL_ENV
logger = logging.getLogger(__name__)
def extract_module_name(path):
import re
_CORE_NAME_REGEX = re.compile(r'azure-cli-(?P<name>[^/\\]+)[/\\]azure[/\\]cli')
_MOD_NAME_REGEX = re.compile(r'azure-cli[/\\]azure[/\\]cli[/\\]command_modules[/\\](?P<name>[^/\\]+)')
_EXT_NAME_REGEX = re.compile(r'.*(?P<name>azext_[^/\\]+).*')
for expression in [_MOD_NAME_REGEX, _CORE_NAME_REGEX, _EXT_NAME_REGEX]:
match = re.search(expression, path)
if not match:
continue
return match.groupdict().get('name')
raise CLIError('unexpected error: unable to extract name from path: {}'.format(path))
def get_env_path():
""" Returns the path to the current virtual environment.
:returns: Path (str) to the virtual env or None.
"""
env_path = None
for item in ENV_VAR_VIRTUAL_ENV:
env_path = os.environ.get(item)
if env_path:
break
return env_path
def get_azdev_repo_path():
""" Return the path to the azdev repo root.
:returns: Path (str) to azdev repo.
"""
here = os.path.dirname(os.path.realpath(__file__))
while not os.path.exists(os.path.join(here, '.git')):
here = os.path.dirname(here)
return here
def get_cli_repo_path():
""" Return the path to the Azure CLI repo.
:returns: Path (str) to Azure CLI repo.
"""
from configparser import NoSectionError
from .config import get_azdev_config
try:
return get_azdev_config().get('cli', 'repo_path')
except NoSectionError:
raise CLIError('Unable to retrieve CLI repo path from config. Please run `azdev setup`.')
def get_ext_repo_paths():
""" Return the paths to the Azure CLI dev extensions.
:returns: Path (str) to Azure CLI dev extension repos.
"""
from configparser import NoSectionError
from .config import get_azdev_config
try:
return get_azdev_config().get('ext', 'repo_paths').split(',')
except NoSectionError:
raise CLIError('Unable to retrieve extensions repo path from config. Please run `azdev setup`.')
def find_file(file_name):
""" Returns the path to a specific file.
:returns: Path (str) to file or None.
"""
for path, _, files in os.walk(os.getcwd()):
if file_name in files:
return path
return None
def find_files(root_paths, file_pattern):
""" Returns the paths to all files that match a given pattern.
:returns: Paths ([str]) to files matching the given pattern.
"""
if isinstance(root_paths, str):
root_paths = [root_paths]
paths = []
for root_path in root_paths:
for path, _, _ in os.walk(root_path):
pattern = os.path.join(path, file_pattern)
paths.extend(glob(pattern))
return paths
def make_dirs(path):
""" Create directories recursively. """
import errno
try:
os.makedirs(os.path.expanduser(path))
except OSError as exc: # Python <= 2.5
if exc.errno == errno.EEXIST and os.path.isdir(path):
pass
else:
raise
def get_name_index(invert=False, include_whl_extensions=False):
""" Returns a dictionary containing the long and short names of modules and extensions is {SHORT:LONG} format or
{LONG:SHORT} format when invert=True. """
from azure.cli.core.extension import EXTENSIONS_DIR # pylint: disable=import-error
table = {}
cli_repo_path = get_cli_repo_path()
ext_repo_paths = get_ext_repo_paths()
# unified azure-cli package (2.0.68 and later)
paths = os.path.normcase(
os.path.join(
cli_repo_path, 'src', 'azure-cli', 'azure', 'cli', 'command_modules', '*', '__init__.py'
)
)
modules_paths = glob(paths)
core_paths = glob(os.path.normcase(os.path.join(cli_repo_path, 'src', '*', 'setup.py')))
ext_paths = [x for x in find_files(ext_repo_paths, '*.*-info') if 'site-packages' not in x]
whl_ext_paths = []
if include_whl_extensions:
whl_ext_paths = [x for x in find_files(EXTENSIONS_DIR, '*.*-info') if 'site-packages' not in x]
def _update_table(paths, key):
folder = None
long_name = None
short_name = None
for path in paths:
folder = os.path.dirname(path)
base_name = os.path.basename(folder)
# determine long-names
if key == 'ext':
short_name = base_name
for item in os.listdir(folder):
if item.startswith(EXTENSION_PREFIX):
long_name = item
break
elif base_name.startswith(COMMAND_MODULE_PREFIX):
long_name = base_name
short_name = base_name.replace(COMMAND_MODULE_PREFIX, '') or '__main__'
else:
short_name = base_name
long_name = '{}{}'.format(COMMAND_MODULE_PREFIX, base_name)
if not invert:
table[short_name] = long_name
else:
table[long_name] = short_name
_update_table(modules_paths, 'mod')
_update_table(core_paths, 'core')
_update_table(ext_paths, 'ext')
_update_table(whl_ext_paths, 'ext')
return table
# pylint: disable=too-many-statements
def get_path_table(include_only=None, include_whl_extensions=False):
""" Returns a table containing the long and short names of different modules and extensions and the path to them.
The structure looks like:
{
'core': {
NAME: PATH,
...
},
'mod': {
NAME: PATH,
...
},
'ext': {
NAME: PATH,
...
}
}
"""
from azure.cli.core.extension import EXTENSIONS_DIR # pylint: disable=import-error
# determine whether the call will filter or return all
if isinstance(include_only, str):
include_only = [include_only]
get_all = not include_only
table = {}
cli_repo_path = get_cli_repo_path()
ext_repo_paths = get_ext_repo_paths()
paths = os.path.normcase(
os.path.join(
cli_repo_path, 'src', 'azure-cli', 'azure', 'cli', 'command_modules', '*', '__init__.py'
)
)
modules_paths = glob(paths)
core_paths = glob(os.path.normcase(os.path.join(cli_repo_path, 'src', '*', 'setup.py')))
ext_paths = [x for x in find_files(ext_repo_paths, '*.*-info') if 'site-packages' not in x]
whl_ext_paths = [x for x in find_files(EXTENSIONS_DIR, '*.*-info') if 'site-packages' not in x]
def _update_table(package_paths, key):
if key not in table:
table[key] = {}
for path in package_paths:
folder = os.path.dirname(path)
base_name = os.path.basename(folder)
if key == 'ext':
short_name = base_name
long_name = next((item for item in os.listdir(folder) if item.startswith(EXTENSION_PREFIX)), None)
else:
short_name = base_name
long_name = '{}{}'.format(COMMAND_MODULE_PREFIX, base_name)
if get_all:
table[key][long_name if key == 'ext' else short_name] = folder
elif not include_only:
return # nothing left to filter
else:
# check and update filter
if short_name in include_only:
include_only.remove(short_name)
table[key][short_name] = folder
if long_name in include_only:
# long name takes precedence to ensure path doesn't appear twice
include_only.remove(long_name)
table[key].pop(short_name, None)
table[key][long_name] = folder
# Since include_only.remove will delete the found name
# Adjust the order of _update_table to ensure that extension is updated first.
# When the extension name and module name are the same
# Let azdev style tests the extension instead of the main module.
_update_table(ext_paths, 'ext')
if include_whl_extensions:
_update_table(whl_ext_paths, 'ext')
_update_table(modules_paths, 'mod')
_update_table(core_paths, 'core')
if include_only:
whl_extensions = [mod for whl_ext_path in whl_ext_paths for mod in include_only if mod in whl_ext_path]
if whl_extensions:
err = 'extension(s): [ {} ] installed from a wheel may need --include-whl-extensions option'.format(
', '.join(whl_extensions))
raise CLIError(err)
raise CLIError('unrecognized modules: [ {} ]'.format(', '.join(include_only)))
return table
def calc_selected_modules(modules=None, include_whl_extensions=False):
# allow user to run only on CLI or extensions
cli_only = modules == ['CLI']
ext_only = modules == ['EXT']
if cli_only or ext_only:
modules = None
selected_modules = get_path_table(include_only=modules, include_whl_extensions=include_whl_extensions)
if cli_only:
selected_modules['ext'] = {}
if ext_only:
selected_modules['core'] = {}
selected_modules['mod'] = {}
return selected_modules
def calc_selected_mod_names(modules=None, include_whl_extensions=False):
selected_modules = calc_selected_modules(modules, include_whl_extensions=include_whl_extensions)
if not any(selected_modules.values()):
logger.warning('No commands selected to check.')
selected_mod_names = list(selected_modules['mod'].keys())
selected_mod_names += list(selected_modules['ext'].keys())
selected_mod_names += list(selected_modules['core'].keys())
return selected_mod_names