azdev/operations/testtool/__init__.py (261 lines of code) (raw):
# -----------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# -----------------------------------------------------------------------------
import glob
from importlib import import_module
import json
import os
import re
from subprocess import CalledProcessError
import sys
from knack.log import get_logger
from knack.util import CLIError
from azdev.utilities import (
display, output, heading, subheading,
cmd as raw_cmd, py_cmd, pip_cmd, find_file, IS_WINDOWS,
ENV_VAR_TEST_LIVE,
COMMAND_MODULE_PREFIX, EXTENSION_PREFIX,
make_dirs, get_azdev_config_dir,
get_path_table, require_virtual_env, get_name_index)
from .pytest_runner import get_test_runner
from .profile_context import ProfileContext, current_profile
from .incremental_strategy import CLIAzureDevOpsContext
logger = get_logger(__name__)
# pylint: disable=too-many-statements,too-many-locals
def run_tests(tests, xml_path=None, discover=False, in_series=False,
run_live=False, profile=None, last_failed=False, pytest_args=None,
no_exit_first=False, mark=None,
git_source=None, git_target=None, git_repo=None,
cli_ci=False):
require_virtual_env()
DEFAULT_RESULT_FILE = 'test_results.xml'
DEFAULT_RESULT_PATH = os.path.join(get_azdev_config_dir(), DEFAULT_RESULT_FILE)
heading('Run Tests')
path_table = get_path_table()
target_tests = set()
if not tests:
tests = list(path_table['mod'].keys()) + list(path_table['core'].keys()) + list(path_table['ext'].keys())
if tests == ['CLI']:
tests = list(path_table['mod'].keys()) + list(path_table['core'].keys())
elif tests == ['EXT']:
tests = list(path_table['ext'].keys())
else:
target_tests = set(tests)
test_index = _get_test_index(profile or current_profile(), discover, target_tests=target_tests)
# filter out tests whose modules haven't changed
modified_mods = _filter_by_git_diff(tests, test_index, git_source, git_target, git_repo)
if modified_mods:
display('\nTest on modules: {}\n'.format(', '.join(modified_mods)))
if cli_ci is True:
ctx = CLIAzureDevOpsContext(git_repo, git_source, git_target)
modified_mods = ctx.filter(test_index)
# resolve the path at which to dump the XML results
xml_path = xml_path or DEFAULT_RESULT_PATH
if not xml_path.endswith('.xml'):
xml_path = os.path.join(xml_path, DEFAULT_RESULT_FILE)
# process environment variables
if run_live:
logger.warning('RUNNING TESTS LIVE')
os.environ[ENV_VAR_TEST_LIVE] = 'True'
def _find_test(index, name):
name_comps = name.split('.')
num_comps = len(name_comps)
key_error = KeyError()
for i in range(num_comps):
check_name = '.'.join(name_comps[(-1 - i):])
try:
match = index[check_name]
if check_name != name:
logger.info("Test found using just '%s'. The rest of the name was ignored.\n", check_name)
return match
except KeyError as ex:
key_error = ex
continue
raise key_error
# lookup test paths from index
test_paths = []
for t in modified_mods:
try:
test_path = os.path.normpath(_find_test(test_index, t))
test_paths.append(test_path)
except KeyError:
logger.warning("'%s' not found. If newly added, re-run with --discover", t)
continue
exit_code = 0
# Tests have been collected. Now run them.
if not test_paths:
logger.warning('No tests selected to run.')
sys.exit(exit_code)
exit_code = 0
with ProfileContext(profile):
runner = get_test_runner(parallel=not in_series,
log_path=xml_path,
last_failed=last_failed,
no_exit_first=no_exit_first,
mark=mark)
exit_code = runner(test_paths=test_paths, pytest_args=pytest_args)
sys.exit(0 if not exit_code else 1)
def _filter_by_git_diff(tests, test_index, git_source, git_target, git_repo):
from azdev.utilities import diff_branches, extract_module_name
from azdev.utilities.git_util import summarize_changed_mods
if not any([git_source, git_target, git_repo]):
return tests
if not all([git_target, git_repo]):
raise CLIError('usage error: [--src NAME] --tgt NAME --repo PATH')
files_changed = diff_branches(git_repo, git_target, git_source)
mods_changed = summarize_changed_mods(files_changed)
repo_path = str(os.path.abspath(git_repo)).lower()
to_remove = []
for key in tests:
test_path = test_index.get(key, None)
if test_path and test_path.lower().startswith(repo_path):
mod_name = extract_module_name(test_path)
if next((x for x in mods_changed if mod_name in x), None):
# has changed, so do not filter out
continue
# in not in the repo or has not changed, filter out
to_remove.append(key)
# remove the unchanged modules
tests = [t for t in tests if t not in to_remove]
logger.info('Filtered out: %s', to_remove)
return tests
def _discover_module_tests(mod_name, mod_data):
# get the list of test files in each module
total_tests = 0
total_files = 0
logger.info('Mod: %s', mod_name)
try:
contents = os.listdir(mod_data['filepath'])
test_files = {
x[:-len('.py')]: {} for x in contents if x.startswith('test_') and x.endswith('.py')
}
total_files = len(test_files)
except FileNotFoundError:
logger.info(' No test files found.')
return None
for file_name in test_files:
mod_data['files'][file_name] = {}
test_file_path = mod_data['base_path'] + '.' + file_name
try:
module = import_module(test_file_path)
except ImportError as ex:
logger.info(' %s', ex)
continue
module_dict = module.__dict__
possible_test_classes = {x: y for x, y in module_dict.items() if not x.startswith('_')}
for class_name, class_def in possible_test_classes.items():
try:
class_dict = class_def.__dict__
except AttributeError:
# skip non-class symbols in files like constants, imported methods, etc.
continue
if class_dict.get('__module__') == test_file_path:
tests = [x for x in class_def.__dict__ if x.startswith('test_')]
if tests:
mod_data['files'][file_name][class_name] = tests
total_tests += len(tests)
logger.info(' %s tests found in %s files.', total_tests, total_files)
return mod_data
# pylint: disable=too-many-statements, too-many-locals
def _discover_tests(profile, target_tests):
""" Builds an index of tests so that the user can simply supply the name they wish to test instead of the
full path.
"""
profile_split = profile.split('-')
profile_namespace = '_'.join([profile_split[-1]] + profile_split[:-1])
heading('Discovering Tests')
path_table = get_path_table()
core_modules = path_table['core'].items()
command_modules = path_table['mod'].items()
extensions = path_table['ext'].items()
inverse_name_table = get_name_index(invert=True)
module_data = {}
logger.info('\nCore Modules: %s', ', '.join([name for name, _ in core_modules]))
for mod_name, mod_path in core_modules:
file_path = mod_path
for comp in mod_name.split('-'):
file_path = os.path.join(file_path, comp)
mod_data = {
'alt_name': 'main' if mod_name == 'azure-cli' else mod_name.replace(COMMAND_MODULE_PREFIX, ''),
'filepath': os.path.join(file_path, 'tests'),
'base_path': '{}.tests'.format(mod_name).replace('-', '.'),
'files': {}
}
tests = _discover_module_tests(mod_name, mod_data)
if tests:
module_data[mod_name] = tests
logger.info('\nCommand Modules: %s', ', '.join([name for name, _ in command_modules]))
for mod_name, mod_path in command_modules:
mod_data = {
# Modules don't technically have azure-cli-foo moniker anymore, but preserving
# for consistency.
'alt_name': '{}{}'.format(COMMAND_MODULE_PREFIX, mod_name),
'filepath': os.path.join(
mod_path, 'tests', profile_namespace),
'base_path': 'azure.cli.command_modules.{}.tests.{}'.format(mod_name, profile_namespace),
'files': {}
}
tests = _discover_module_tests(mod_name, mod_data)
if tests:
module_data[mod_name] = tests
logger.info('\nExtensions: %s', ', '.join([name for name, _ in extensions if name]))
for mod_name, mod_path in extensions:
glob_pattern = os.path.normcase(os.path.join('{}*'.format(EXTENSION_PREFIX)))
try:
file_path = glob.glob(os.path.join(mod_path, glob_pattern))[0]
except IndexError:
logger.debug("No extension found at: %s", os.path.join(mod_path, glob_pattern))
continue
import_name = os.path.basename(file_path)
mod_data = {
'alt_name': inverse_name_table[mod_name],
'filepath': os.path.join(file_path, 'tests', profile_namespace),
'base_path': '{}.tests.{}'.format(import_name, profile_namespace),
'files': {}
}
tests = _discover_module_tests(import_name, mod_data)
if tests:
module_data[mod_name] = tests
test_index = {}
conflicted_keys = []
def add_to_index(key, path):
from azdev.utilities import extract_module_name
key = key or mod_name
if key in test_index:
if key not in conflicted_keys:
conflicted_keys.append(key)
mod1 = extract_module_name(path)
mod2 = extract_module_name(test_index[key])
if mod1 != mod2:
# resolve conflicted keys by prefixing with the module name and a dot (.)
if key in target_tests or mod1 in target_tests or mod2 in target_tests:
logger.warning("'%s' exists in both '%s' and '%s'. Resolve using `%s.%s` or `%s.%s`"
"Duplication exists in: \n\t%s\n\t%s\n",
key, mod1, mod2, mod1, key, mod2, key, path, test_index[key])
test_index['{}.{}'.format(mod1, key)] = path
test_index['{}.{}'.format(mod2, key)] = test_index[key]
else:
if key in target_tests or mod1 in target_tests:
logger.error("'%s' exists twice in the '%s' module. "
"Please rename one or both and re-run --discover. "
"Duplication exists in: \n\t%s\n\t%s\n", key, mod1, test_index[key], path)
else:
test_index[key] = path
# build the index
for mod_name, mod_data in module_data.items():
# don't add empty mods to the index
if not mod_data:
continue
mod_path = mod_data['filepath']
for file_name, file_data in mod_data['files'].items():
file_path = os.path.join(mod_path, file_name) + '.py'
for class_name, test_list in file_data.items():
for test_name in test_list:
test_path = '{}::{}::{}'.format(file_path, class_name, test_name)
add_to_index(test_name, test_path)
class_path = '{}::{}'.format(file_path, class_name)
add_to_index(class_name, class_path)
add_to_index(file_name, file_path)
add_to_index(mod_name, mod_path)
add_to_index(mod_data['alt_name'], mod_path)
# remove the conflicted keys since they would arbitrarily point to a random implementation
for key in conflicted_keys:
del test_index[key]
return test_index
def _get_test_index(profile, discover, target_tests):
config_dir = get_azdev_config_dir()
test_index_dir = os.path.join(config_dir, 'test_index')
make_dirs(test_index_dir)
test_index_path = os.path.join(test_index_dir, '{}.json'.format(profile))
test_index = {}
if discover:
test_index = _discover_tests(profile, target_tests)
with open(test_index_path, 'w') as f:
f.write(json.dumps(test_index))
display('\ntest index updated: {}'.format(test_index_path))
elif os.path.isfile(test_index_path):
with open(test_index_path, 'r') as f:
test_index = json.loads(''.join(f.readlines()))
display('\ntest index found: {}'.format(test_index_path))
else:
test_index = _discover_tests(profile, target_tests)
with open(test_index_path, 'w') as f:
f.write(json.dumps(test_index))
display('\ntest index created: {}'.format(test_index_path))
return test_index