in scripts/azureml-assets/azureml/assets/validate_assets.py [0:0]
def validate_assets(input_dirs: List[Path],
asset_config_filename: str,
model_validation_results_dir: str = None,
changed_files: List[Path] = None,
check_names: bool = False,
check_names_skip_pattern: re.Pattern = None,
check_images: bool = False,
check_categories: bool = False,
check_build_context: bool = False,
check_tests: bool = False,
check_environment_version: bool = False) -> bool:
"""Validate assets.
Args:
input_dirs (List[Path]): Directories containing assets.
asset_config_filename (str): Asset config filename to search for.
model_validation_results_dir (str, optional): Dir containing model validation results
changed_files (List[Path], optional): List of changed files, used to filter assets. Defaults to None.
check_names (bool, optional): Whether to check asset names. Defaults to False.
check_names_skip_pattern (re.Pattern, optional): Regex pattern to skip name validation. Defaults to None.
check_images (bool, optional): Whether to check image names. Defaults to False.
check_categories (bool, optional): Whether to check asset categories. Defaults to False.
check_build_context (bool, optional): Whether to check environment build context. Defaults to False.
check_tests (bool, optional): Whether to check test references. Defaults to False.
check_environment_version (bool, optional): Whether to check environment version. Defaults to False.
Raises:
ValidationException: If validation fails.
Returns:
bool: True if assets were successfully validated, otherwise False.
"""
# Gather list of just changed assets, for later filtering
changed_assets = util.find_asset_config_files(input_dirs, asset_config_filename, changed_files) if changed_files else None # noqa: E501
validated_model_map = get_validated_models_assets_map(model_validation_results_dir)
# Find assets under input dirs
asset_count = 0
error_count = 0
asset_dirs = defaultdict(list)
image_names = defaultdict(list)
for asset_config_path in util.find_asset_config_files(input_dirs, asset_config_filename):
asset_count += 1
# Errors only "count" if changed_files was None or the asset was changed
validate_this = changed_assets is None or asset_config_path in changed_assets
# Load config
try:
asset_config = assets.AssetConfig(asset_config_path)
except Exception as e:
if validate_this:
_log_error(asset_config_path, e)
error_count += 1
else:
_log_warning(asset_config_path, e)
continue
# Extract model variant info from spec
variant_info = None
if asset_config.type == assets.AssetType.MODEL:
with open(asset_config.spec_with_path, "r") as f:
spec_config = yaml.safe_load(f)
variant_info = spec_config.get("variantInfo")
if variant_info is not None:
spec_config.pop("variantInfo")
if variant_info is not None:
logger.print(f"Found variantInfo in spec, popping out info and rewriting spec. "
f"variantInfo: {variant_info}")
with open(asset_config.spec_with_path, "w") as f:
yaml.dump(spec_config, f)
# Populate dictionary of asset names to asset config paths
asset_dirs[f"{asset_config.type.value} {asset_config.name}"].append(asset_config_path)
# validated_model_map would be empty for non-drop scenario
if asset_config.type == assets.AssetType.MODEL:
error_count += validate_model_spec(asset_config)
# should run during drop creation only
if validated_model_map:
error_count += confirm_model_validation_results(
asset_config,
validated_model_map.get(asset_config.name, None)
)
# Populate dictionary of image names to asset config paths
environment_config = None
if asset_config.type == assets.AssetType.ENVIRONMENT:
try:
environment_config = asset_config.extra_config_as_object()
# Store fully qualified image name
image_name = environment_config.image_name
if environment_config.publish_location:
image_name = f"{environment_config.publish_location.value}/{image_name}"
image_names[image_name].append(asset_config.file_path)
except Exception as e:
if validate_this:
_log_error(environment_config.file_name_with_path, e)
error_count += 1
else:
_log_warning(environment_config.file_name_with_path, e)
# Checks for changed assets only, or all assets if changed_files was None
if validate_this:
# Validate name
if check_names:
if check_names_skip_pattern is None or not check_names_skip_pattern.fullmatch(asset_config.full_name):
error_count += validate_name(asset_config)
else:
logger.log_debug(f"Skipping name validation for {asset_config.full_name}")
# Validate pytest information
if check_tests:
error_count += validate_tests(asset_config)
if asset_config.type == assets.AssetType.ENVIRONMENT:
# Validate Dockerfile
error_count += validate_dockerfile(asset_config.extra_config_as_object())
if check_build_context:
error_count += validate_build_context(asset_config.extra_config_as_object())
# Validate environment version
if check_environment_version:
error_count += validate_environment_version(asset_config)
if asset_config.type == assets.AssetType.EVALUATIONRESULT:
error_count += validate_tags(asset_config, 'evaluationresult/tag_values_shared.yaml')
asset_spec = asset_config._spec._yaml
evaluation_type = asset_spec.get('tags', {}).get('evaluation_type', None)
evaluation_tag_files = {
'text_generation': 'evaluationresult/tag_values_text_generation.yaml',
'text_embeddings': 'evaluationresult/tag_values_text_embeddings.yaml',
'vision': 'evaluationresult/tag_values_vision.yaml',
'text_quality': 'evaluationresult/tag_values_text_quality.yaml',
'text_performance': 'evaluationresult/tag_values_text_performance.yaml',
'text_cost': 'evaluationresult/tag_values_text_cost.yaml'
}
if evaluation_type in evaluation_tag_files:
error_count += validate_tags(asset_config, evaluation_tag_files[evaluation_type])
else:
_log_error(
asset_config.file_name_with_path,
f"Asset '{asset_config.name}' has unknown evaluation_type: '{evaluation_type}'"
)
error_count += 1
if asset_config.type == assets.AssetType.PROMPT:
error_count += validate_tags(asset_config, 'tag_values_shared.yaml')
error_count += validate_tags(asset_config, 'tag_values_prompt.yaml')
# Validate categories
if check_categories:
error_count += validate_categories(asset_config)
# Validate specific asset types
if environment_config is not None:
if check_images:
# Check image name
error_count += validate_image_publishing(asset_config, environment_config)
# Validate spec
try:
spec = asset_config.spec_as_object()
# Ensure name and version aren't inconsistent
if not assets.Config._contains_template(spec.name) and asset_config.name != spec.name:
raise ValidationException(f"Asset and spec name mismatch: {asset_config.name} != {spec.name}")
if not assets.Config._contains_template(spec.version) and asset_config.version != spec.version:
raise ValidationException(f"Asset and spec version mismatch: {asset_config.version} != {spec.version}") # noqa: E501
except Exception as e:
_log_error(asset_config.spec_with_path, e)
error_count += 1
# Write variantInfo back to spec
if variant_info is not None:
spec_config["variantInfo"] = variant_info
with open(asset_config.spec_with_path, "w") as f:
yaml.dump(spec_config, f)
# Ensure unique assets
for type_and_name, dirs in asset_dirs.items():
if len(dirs) > 1:
dirs_str = [d.as_posix() for d in dirs]
logger.log_error(f"{type_and_name} found in multiple asset YAMLs: {dirs_str}")
error_count += 1
# Ensure unique image names
for image_name, dirs in image_names.items():
if len(dirs) > 1:
dirs_str = [d.as_posix() for d in dirs]
logger.log_error(f"{image_name} found in multiple assets: {dirs_str}")
error_count += 1
logger.print(f"Found {error_count} error(s) in {asset_count} asset(s)")
return error_count == 0