atr/tasks/checks/zipformat.py (268 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. import asyncio import logging import os import zipfile from typing import Any, Final import atr.tasks.checks as checks import atr.tasks.checks.license as license _LOGGER: Final = logging.getLogger(__name__) async def integrity(args: checks.FunctionArguments) -> str | None: """Check that the zip archive is not corrupted and can be opened.""" recorder = await args.recorder() if not (artifact_abs_path := await recorder.abs_path()): return None _LOGGER.info(f"Checking zip integrity for {artifact_abs_path} (rel: {args.primary_rel_path})") try: result_data = await asyncio.to_thread(_integrity_check_core_logic, str(artifact_abs_path)) if result_data.get("error"): await recorder.failure(result_data["error"], result_data) else: await recorder.success(f"Zip archive integrity OK ({result_data['member_count']} members)", result_data) except Exception as e: await recorder.failure("Error checking zip integrity", {"error": str(e)}) return None async def license_files(args: checks.FunctionArguments) -> str | None: """Check that the LICENSE and NOTICE files exist and are valid within the zip.""" recorder = await args.recorder() if not (artifact_abs_path := await recorder.abs_path()): return None _LOGGER.info(f"Checking zip license files for {artifact_abs_path} (rel: {args.primary_rel_path})") try: result_data = await asyncio.to_thread(_license_files_check_core_logic_zip, str(artifact_abs_path)) if result_data.get("error"): await recorder.failure(result_data["error"], result_data) elif result_data.get("license_valid") and result_data.get("notice_valid"): await recorder.success("LICENSE and NOTICE files present and valid in zip", result_data) else: issues = [] if not result_data.get("license_found"): issues.append("LICENSE missing") elif not result_data.get("license_valid"): issues.append("LICENSE invalid or empty") if not result_data.get("notice_found"): issues.append("NOTICE missing") elif not result_data.get("notice_valid"): issues.append("NOTICE invalid or empty") issue_str = ", ".join(issues) if issues else "Issues found with LICENSE or NOTICE files" await recorder.failure(issue_str, result_data) except Exception as e: await recorder.failure("Error checking zip license files", {"error": str(e)}) return None async def license_headers(args: checks.FunctionArguments) -> str | None: """Check that all source files within the zip have valid license headers.""" recorder = await args.recorder() if not (artifact_abs_path := await recorder.abs_path()): return None _LOGGER.info(f"Checking zip license headers for {artifact_abs_path} (rel: {args.primary_rel_path})") try: result_data = await asyncio.to_thread(_license_headers_check_core_logic_zip, str(artifact_abs_path)) if result_data.get("error_message"): await recorder.failure(result_data["error_message"], result_data) elif not result_data.get("valid"): num_issues = len(result_data.get("files_without_headers", [])) failure_msg = f"{num_issues} file(s) missing or having invalid license headers" await recorder.failure(failure_msg, result_data) else: await recorder.success( f"License headers OK ({result_data.get('files_checked', 0)} files checked)", result_data ) except Exception as e: await recorder.failure("Error checking zip license headers", {"error": str(e)}) return None async def structure(args: checks.FunctionArguments) -> str | None: """Check that the zip archive has a single root directory matching the artifact name.""" recorder = await args.recorder() if not (artifact_abs_path := await recorder.abs_path()): return None _LOGGER.info(f"Checking zip structure for {artifact_abs_path} (rel: {args.primary_rel_path})") try: result_data = await asyncio.to_thread(_structure_check_core_logic, str(artifact_abs_path)) if result_data.get("warning"): await recorder.warning(result_data["warning"], result_data) elif result_data.get("error"): await recorder.failure(result_data["error"], result_data) else: await recorder.success(f"Zip structure OK (root: {result_data['root_dir']})", result_data) except Exception as e: await recorder.failure("Error checking zip structure", {"error": str(e)}) return None def _integrity_check_core_logic(artifact_path: str) -> dict[str, Any]: """Verify that a zip file can be opened and its members listed.""" try: with zipfile.ZipFile(artifact_path, "r") as zf: # This is a simple check using list members # We can use zf.testzip() for CRC checks if needed, though this will be slower member_list = zf.namelist() return {"member_count": len(member_list)} except zipfile.BadZipFile as e: return {"error": f"Bad zip file: {e}"} except FileNotFoundError: return {"error": "File not found"} except Exception as e: return {"error": f"Unexpected error: {e}"} def _license_files_check_core_logic_zip(artifact_path: str) -> dict[str, Any]: """Verify LICENSE and NOTICE files within a zip archive.""" # TODO: Obviously we want to reuse the license files check logic from license.py # But we'd need to have task dependencies to do that, ideally try: with zipfile.ZipFile(artifact_path, "r") as zf: members = zf.namelist() if not members: return {"error": "Archive is empty"} root_dir = _license_files_find_root_dir_zip(members) # _LOGGER.info(f"Root dir of {artifact_path}: {root_dir}") if not root_dir: return {"error": "Could not determine root directory"} expected_license_path = root_dir + "/LICENSE" expected_notice_path = root_dir + "/NOTICE" member_set = set(members) license_found, license_valid = ( _license_files_check_file_zip(zf, artifact_path, expected_license_path) if (expected_license_path in member_set) else (False, False) ) notice_found, notice_valid = ( _license_files_check_file_zip(zf, artifact_path, expected_notice_path) if (expected_notice_path in member_set) else (False, False) ) return { "root_dir": root_dir, "license_found": license_found, "license_valid": license_valid, "notice_found": notice_found, "notice_valid": notice_valid, } except zipfile.BadZipFile as e: return {"error": f"Bad zip file: {e}"} except FileNotFoundError: return {"error": "File not found"} except Exception as e: return {"error": f"Unexpected error: {e}"} def _license_files_check_file_zip(zf: zipfile.ZipFile, artifact_path: str, expected_path: str) -> tuple[bool, bool]: """Check for the presence and basic validity of a specific file in a zip.""" found = False valid = False try: with zf.open(expected_path) as file_handle: found = True content = file_handle.read().strip() if content: # TODO: Add more specific NOTICE checks if needed valid = True except KeyError: # File not found in zip ... except Exception as e: filename = os.path.basename(expected_path) _LOGGER.warning(f"Error reading {filename} in zip {artifact_path}: {e}") return found, valid def _license_files_find_root_dir_zip(members: list[str]) -> str | None: """Find the root directory in a list of zip members.""" for member in members: if "/" in member: return member.split("/", 1)[0] return None def _license_headers_check_core_logic_zip(artifact_path: str) -> dict[str, Any]: """Verify license headers for files within a zip archive.""" files_checked = 0 files_with_issues: list[str] = [] try: with zipfile.ZipFile(artifact_path, "r") as zf: members = zf.infolist() for member_info in members: if member_info.is_dir(): continue member_path = member_info.filename _, extension = os.path.splitext(member_path) extension = extension.lower().lstrip(".") if not _license_headers_check_should_check_zip(member_path, extension): continue files_checked += 1 is_valid, error_msg = _license_headers_check_single_file_zip(zf, member_info, extension) if error_msg: # Already includes path and error type files_with_issues.append(error_msg) elif not is_valid: # Just append path for header mismatch files_with_issues.append(member_path) if files_with_issues: return { "valid": False, "files_checked": files_checked, "files_without_headers": files_with_issues, "error_message": None, } else: return { "valid": True, "files_checked": files_checked, "files_without_headers": [], "error_message": None, } except zipfile.BadZipFile as e: return {"valid": False, "error_message": f"Bad zip file: {e}"} except FileNotFoundError: return {"valid": False, "error_message": "File not found"} except Exception as e: return {"valid": False, "error_message": f"Unexpected error: {e}"} def _license_headers_check_should_check_zip(member_path: str, extension: str) -> bool: """Determine whether a file in a zip should be checked for license headers.""" for pattern in license.INCLUDED_PATTERNS: if license.re.match(pattern, f".{extension}"): # Also check whether we have a comment style defined for it if license.COMMENT_STYLES.get(extension): return True else: _LOGGER.warning(f"No comment style defined for included extension '{extension}' in {member_path}") return False return False def _license_headers_check_single_file_zip( zf: zipfile.ZipFile, member_info: zipfile.ZipInfo, extension: str ) -> tuple[bool, str | None]: """Check the license header of a single file within a zip. Returns (is_valid, error_message).""" member_path = member_info.filename try: with zf.open(member_path) as file_in_zip: content_bytes = file_in_zip.read(2048) header_bytes = license.strip_comments(content_bytes, extension) expected_header_bytes = license.APACHE_LICENSE_HEADER if header_bytes == expected_header_bytes: return True, None else: # Header mismatch return False, None except Exception as read_error: return False, f"{member_path} (Read Error: {read_error})" def _structure_check_core_logic(artifact_path: str) -> dict[str, Any]: """Verify the internal structure of the zip archive.""" try: with zipfile.ZipFile(artifact_path, "r") as zf: members = zf.namelist() if not members: return {"error": "Archive is empty"} base_name = os.path.basename(artifact_path) name_part = base_name.removesuffix(".zip") # # TODO: Airavata has e.g. "-source-release" # # It would be useful if there were a function in analysis.py for stripping these # # But the root directory should probably always match the name of the file sans suffix # # (This would also be easier to implement) # if name_part.endswith(("-src", "-bin", "-dist")): # name_part = "-".join(name_part.split("-")[:-1]) expected_root = name_part root_dirs, non_rooted_files = _structure_check_core_logic_find_roots(zf, members) actual_root, error_msg = _structure_check_core_logic_validate_root( members, root_dirs, non_rooted_files, expected_root ) if error_msg: if error_msg.startswith("Root directory mismatch"): return {"warning": error_msg} else: return {"error": error_msg} if actual_root: return {"root_dir": actual_root} return {"error": "Unknown structure validation error"} except zipfile.BadZipFile as e: return {"error": f"Bad zip file: {e}"} except FileNotFoundError: return {"error": "File not found"} except Exception as e: return {"error": f"Unexpected error: {e}"} def _structure_check_core_logic_find_roots(zf: zipfile.ZipFile, members: list[str]) -> tuple[set[str], list[str]]: """Identify root directories and non-rooted files in a zip archive.""" root_dirs: set[str] = set() non_rooted_files: list[str] = [] for member in members: if "/" in member: root_dirs.add(member.split("/", 1)[0]) elif not zipfile.Path(zf, member).is_dir(): non_rooted_files.append(member) return root_dirs, non_rooted_files def _structure_check_core_logic_validate_root( members: list[str], root_dirs: set[str], non_rooted_files: list[str], expected_root: str ) -> tuple[str | None, str | None]: """Validate the identified root structure against expectations.""" if non_rooted_files: return None, f"Files found directly in root: {non_rooted_files}" if not root_dirs: return None, "No directories found in archive" if len(root_dirs) > 1: return None, f"Multiple root directories found: {sorted(list(root_dirs))}" actual_root = next(iter(root_dirs)) if actual_root != expected_root: return None, f"Root directory mismatch. Expected '{expected_root}', found '{actual_root}'" # Check whether all members are under the correct root directory for member in members: if member == actual_root.rstrip("/"): continue if not member.startswith(expected_root): return None, f"Member found outside expected root directory: {member}" return actual_root, None