def validate_assets()

in scripts/azureml-assets/azureml/assets/validate_assets.py [0:0]


def validate_assets(input_dirs: List[Path],
                    asset_config_filename: str,
                    model_validation_results_dir: str = None,
                    changed_files: List[Path] = None,
                    check_names: bool = False,
                    check_names_skip_pattern: re.Pattern = None,
                    check_images: bool = False,
                    check_categories: bool = False,
                    check_build_context: bool = False,
                    check_tests: bool = False,
                    check_environment_version: bool = False) -> bool:
    """Validate assets.

    Args:
        input_dirs (List[Path]): Directories containing assets.
        asset_config_filename (str): Asset config filename to search for.
        model_validation_results_dir (str, optional): Dir containing model validation results
        changed_files (List[Path], optional): List of changed files, used to filter assets. Defaults to None.
        check_names (bool, optional): Whether to check asset names. Defaults to False.
        check_names_skip_pattern (re.Pattern, optional): Regex pattern to skip name validation. Defaults to None.
        check_images (bool, optional): Whether to check image names. Defaults to False.
        check_categories (bool, optional): Whether to check asset categories. Defaults to False.
        check_build_context (bool, optional): Whether to check environment build context. Defaults to False.
        check_tests (bool, optional): Whether to check test references. Defaults to False.
        check_environment_version (bool, optional): Whether to check environment version. Defaults to False.

    Raises:
        ValidationException: If validation fails.

    Returns:
        bool: True if assets were successfully validated, otherwise False.
    """
    # Gather list of just changed assets, for later filtering
    changed_assets = util.find_asset_config_files(input_dirs, asset_config_filename, changed_files) if changed_files else None  # noqa: E501
    validated_model_map = get_validated_models_assets_map(model_validation_results_dir)

    # Find assets under input dirs
    asset_count = 0
    error_count = 0
    asset_dirs = defaultdict(list)
    image_names = defaultdict(list)
    for asset_config_path in util.find_asset_config_files(input_dirs, asset_config_filename):
        asset_count += 1
        # Errors only "count" if changed_files was None or the asset was changed
        validate_this = changed_assets is None or asset_config_path in changed_assets

        # Load config
        try:
            asset_config = assets.AssetConfig(asset_config_path)
        except Exception as e:
            if validate_this:
                _log_error(asset_config_path, e)
                error_count += 1
            else:
                _log_warning(asset_config_path, e)
            continue

        # Extract model variant info from spec
        variant_info = None
        if asset_config.type == assets.AssetType.MODEL:
            with open(asset_config.spec_with_path, "r") as f:
                spec_config = yaml.safe_load(f)
                variant_info = spec_config.get("variantInfo")
                if variant_info is not None:
                    spec_config.pop("variantInfo")

            if variant_info is not None:
                logger.print(f"Found variantInfo in spec, popping out info and rewriting spec. "
                             f"variantInfo: {variant_info}")
                with open(asset_config.spec_with_path, "w") as f:
                    yaml.dump(spec_config, f)

        # Populate dictionary of asset names to asset config paths
        asset_dirs[f"{asset_config.type.value} {asset_config.name}"].append(asset_config_path)

        # validated_model_map would be empty for non-drop scenario
        if asset_config.type == assets.AssetType.MODEL:
            error_count += validate_model_spec(asset_config)
            # should run during drop creation only
            if validated_model_map:
                error_count += confirm_model_validation_results(
                    asset_config,
                    validated_model_map.get(asset_config.name, None)
                )

        # Populate dictionary of image names to asset config paths
        environment_config = None
        if asset_config.type == assets.AssetType.ENVIRONMENT:
            try:
                environment_config = asset_config.extra_config_as_object()

                # Store fully qualified image name
                image_name = environment_config.image_name
                if environment_config.publish_location:
                    image_name = f"{environment_config.publish_location.value}/{image_name}"
                image_names[image_name].append(asset_config.file_path)
            except Exception as e:
                if validate_this:
                    _log_error(environment_config.file_name_with_path, e)
                    error_count += 1
                else:
                    _log_warning(environment_config.file_name_with_path, e)

        # Checks for changed assets only, or all assets if changed_files was None
        if validate_this:
            # Validate name
            if check_names:
                if check_names_skip_pattern is None or not check_names_skip_pattern.fullmatch(asset_config.full_name):
                    error_count += validate_name(asset_config)
                else:
                    logger.log_debug(f"Skipping name validation for {asset_config.full_name}")

            # Validate pytest information
            if check_tests:
                error_count += validate_tests(asset_config)

            if asset_config.type == assets.AssetType.ENVIRONMENT:
                # Validate Dockerfile
                error_count += validate_dockerfile(asset_config.extra_config_as_object())
                if check_build_context:
                    error_count += validate_build_context(asset_config.extra_config_as_object())

                # Validate environment version
                if check_environment_version:
                    error_count += validate_environment_version(asset_config)

            if asset_config.type == assets.AssetType.EVALUATIONRESULT:
                error_count += validate_tags(asset_config, 'evaluationresult/tag_values_shared.yaml')

                asset_spec = asset_config._spec._yaml
                evaluation_type = asset_spec.get('tags', {}).get('evaluation_type', None)

                evaluation_tag_files = {
                    'text_generation': 'evaluationresult/tag_values_text_generation.yaml',
                    'text_embeddings': 'evaluationresult/tag_values_text_embeddings.yaml',
                    'vision': 'evaluationresult/tag_values_vision.yaml',
                    'text_quality': 'evaluationresult/tag_values_text_quality.yaml',
                    'text_performance': 'evaluationresult/tag_values_text_performance.yaml',
                    'text_cost': 'evaluationresult/tag_values_text_cost.yaml'
                }

                if evaluation_type in evaluation_tag_files:
                    error_count += validate_tags(asset_config, evaluation_tag_files[evaluation_type])
                else:
                    _log_error(
                        asset_config.file_name_with_path,
                        f"Asset '{asset_config.name}' has unknown evaluation_type: '{evaluation_type}'"
                    )
                    error_count += 1

            if asset_config.type == assets.AssetType.PROMPT:
                error_count += validate_tags(asset_config, 'tag_values_shared.yaml')
                error_count += validate_tags(asset_config, 'tag_values_prompt.yaml')

            # Validate categories
            if check_categories:
                error_count += validate_categories(asset_config)

            # Validate specific asset types
            if environment_config is not None:
                if check_images:
                    # Check image name
                    error_count += validate_image_publishing(asset_config, environment_config)

            # Validate spec
            try:
                spec = asset_config.spec_as_object()

                # Ensure name and version aren't inconsistent
                if not assets.Config._contains_template(spec.name) and asset_config.name != spec.name:
                    raise ValidationException(f"Asset and spec name mismatch: {asset_config.name} != {spec.name}")
                if not assets.Config._contains_template(spec.version) and asset_config.version != spec.version:
                    raise ValidationException(f"Asset and spec version mismatch: {asset_config.version} != {spec.version}")  # noqa: E501
            except Exception as e:
                _log_error(asset_config.spec_with_path, e)
                error_count += 1

        # Write variantInfo back to spec
        if variant_info is not None:
            spec_config["variantInfo"] = variant_info
            with open(asset_config.spec_with_path, "w") as f:
                yaml.dump(spec_config, f)

    # Ensure unique assets
    for type_and_name, dirs in asset_dirs.items():
        if len(dirs) > 1:
            dirs_str = [d.as_posix() for d in dirs]
            logger.log_error(f"{type_and_name} found in multiple asset YAMLs: {dirs_str}")
            error_count += 1

    # Ensure unique image names
    for image_name, dirs in image_names.items():
        if len(dirs) > 1:
            dirs_str = [d.as_posix() for d in dirs]
            logger.log_error(f"{image_name} found in multiple assets: {dirs_str}")
            error_count += 1

    logger.print(f"Found {error_count} error(s) in {asset_count} asset(s)")
    return error_count == 0