atr/tasks/checks/zipformat.py (268 lines of code) (raw):
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import asyncio
import logging
import os
import zipfile
from typing import Any, Final
import atr.tasks.checks as checks
import atr.tasks.checks.license as license
_LOGGER: Final = logging.getLogger(__name__)
async def integrity(args: checks.FunctionArguments) -> str | None:
"""Check that the zip archive is not corrupted and can be opened."""
recorder = await args.recorder()
if not (artifact_abs_path := await recorder.abs_path()):
return None
_LOGGER.info(f"Checking zip integrity for {artifact_abs_path} (rel: {args.primary_rel_path})")
try:
result_data = await asyncio.to_thread(_integrity_check_core_logic, str(artifact_abs_path))
if result_data.get("error"):
await recorder.failure(result_data["error"], result_data)
else:
await recorder.success(f"Zip archive integrity OK ({result_data['member_count']} members)", result_data)
except Exception as e:
await recorder.failure("Error checking zip integrity", {"error": str(e)})
return None
async def license_files(args: checks.FunctionArguments) -> str | None:
"""Check that the LICENSE and NOTICE files exist and are valid within the zip."""
recorder = await args.recorder()
if not (artifact_abs_path := await recorder.abs_path()):
return None
_LOGGER.info(f"Checking zip license files for {artifact_abs_path} (rel: {args.primary_rel_path})")
try:
result_data = await asyncio.to_thread(_license_files_check_core_logic_zip, str(artifact_abs_path))
if result_data.get("error"):
await recorder.failure(result_data["error"], result_data)
elif result_data.get("license_valid") and result_data.get("notice_valid"):
await recorder.success("LICENSE and NOTICE files present and valid in zip", result_data)
else:
issues = []
if not result_data.get("license_found"):
issues.append("LICENSE missing")
elif not result_data.get("license_valid"):
issues.append("LICENSE invalid or empty")
if not result_data.get("notice_found"):
issues.append("NOTICE missing")
elif not result_data.get("notice_valid"):
issues.append("NOTICE invalid or empty")
issue_str = ", ".join(issues) if issues else "Issues found with LICENSE or NOTICE files"
await recorder.failure(issue_str, result_data)
except Exception as e:
await recorder.failure("Error checking zip license files", {"error": str(e)})
return None
async def license_headers(args: checks.FunctionArguments) -> str | None:
"""Check that all source files within the zip have valid license headers."""
recorder = await args.recorder()
if not (artifact_abs_path := await recorder.abs_path()):
return None
_LOGGER.info(f"Checking zip license headers for {artifact_abs_path} (rel: {args.primary_rel_path})")
try:
result_data = await asyncio.to_thread(_license_headers_check_core_logic_zip, str(artifact_abs_path))
if result_data.get("error_message"):
await recorder.failure(result_data["error_message"], result_data)
elif not result_data.get("valid"):
num_issues = len(result_data.get("files_without_headers", []))
failure_msg = f"{num_issues} file(s) missing or having invalid license headers"
await recorder.failure(failure_msg, result_data)
else:
await recorder.success(
f"License headers OK ({result_data.get('files_checked', 0)} files checked)", result_data
)
except Exception as e:
await recorder.failure("Error checking zip license headers", {"error": str(e)})
return None
async def structure(args: checks.FunctionArguments) -> str | None:
"""Check that the zip archive has a single root directory matching the artifact name."""
recorder = await args.recorder()
if not (artifact_abs_path := await recorder.abs_path()):
return None
_LOGGER.info(f"Checking zip structure for {artifact_abs_path} (rel: {args.primary_rel_path})")
try:
result_data = await asyncio.to_thread(_structure_check_core_logic, str(artifact_abs_path))
if result_data.get("warning"):
await recorder.warning(result_data["warning"], result_data)
elif result_data.get("error"):
await recorder.failure(result_data["error"], result_data)
else:
await recorder.success(f"Zip structure OK (root: {result_data['root_dir']})", result_data)
except Exception as e:
await recorder.failure("Error checking zip structure", {"error": str(e)})
return None
def _integrity_check_core_logic(artifact_path: str) -> dict[str, Any]:
"""Verify that a zip file can be opened and its members listed."""
try:
with zipfile.ZipFile(artifact_path, "r") as zf:
# This is a simple check using list members
# We can use zf.testzip() for CRC checks if needed, though this will be slower
member_list = zf.namelist()
return {"member_count": len(member_list)}
except zipfile.BadZipFile as e:
return {"error": f"Bad zip file: {e}"}
except FileNotFoundError:
return {"error": "File not found"}
except Exception as e:
return {"error": f"Unexpected error: {e}"}
def _license_files_check_core_logic_zip(artifact_path: str) -> dict[str, Any]:
"""Verify LICENSE and NOTICE files within a zip archive."""
# TODO: Obviously we want to reuse the license files check logic from license.py
# But we'd need to have task dependencies to do that, ideally
try:
with zipfile.ZipFile(artifact_path, "r") as zf:
members = zf.namelist()
if not members:
return {"error": "Archive is empty"}
root_dir = _license_files_find_root_dir_zip(members)
# _LOGGER.info(f"Root dir of {artifact_path}: {root_dir}")
if not root_dir:
return {"error": "Could not determine root directory"}
expected_license_path = root_dir + "/LICENSE"
expected_notice_path = root_dir + "/NOTICE"
member_set = set(members)
license_found, license_valid = (
_license_files_check_file_zip(zf, artifact_path, expected_license_path)
if (expected_license_path in member_set)
else (False, False)
)
notice_found, notice_valid = (
_license_files_check_file_zip(zf, artifact_path, expected_notice_path)
if (expected_notice_path in member_set)
else (False, False)
)
return {
"root_dir": root_dir,
"license_found": license_found,
"license_valid": license_valid,
"notice_found": notice_found,
"notice_valid": notice_valid,
}
except zipfile.BadZipFile as e:
return {"error": f"Bad zip file: {e}"}
except FileNotFoundError:
return {"error": "File not found"}
except Exception as e:
return {"error": f"Unexpected error: {e}"}
def _license_files_check_file_zip(zf: zipfile.ZipFile, artifact_path: str, expected_path: str) -> tuple[bool, bool]:
"""Check for the presence and basic validity of a specific file in a zip."""
found = False
valid = False
try:
with zf.open(expected_path) as file_handle:
found = True
content = file_handle.read().strip()
if content:
# TODO: Add more specific NOTICE checks if needed
valid = True
except KeyError:
# File not found in zip
...
except Exception as e:
filename = os.path.basename(expected_path)
_LOGGER.warning(f"Error reading {filename} in zip {artifact_path}: {e}")
return found, valid
def _license_files_find_root_dir_zip(members: list[str]) -> str | None:
"""Find the root directory in a list of zip members."""
for member in members:
if "/" in member:
return member.split("/", 1)[0]
return None
def _license_headers_check_core_logic_zip(artifact_path: str) -> dict[str, Any]:
"""Verify license headers for files within a zip archive."""
files_checked = 0
files_with_issues: list[str] = []
try:
with zipfile.ZipFile(artifact_path, "r") as zf:
members = zf.infolist()
for member_info in members:
if member_info.is_dir():
continue
member_path = member_info.filename
_, extension = os.path.splitext(member_path)
extension = extension.lower().lstrip(".")
if not _license_headers_check_should_check_zip(member_path, extension):
continue
files_checked += 1
is_valid, error_msg = _license_headers_check_single_file_zip(zf, member_info, extension)
if error_msg:
# Already includes path and error type
files_with_issues.append(error_msg)
elif not is_valid:
# Just append path for header mismatch
files_with_issues.append(member_path)
if files_with_issues:
return {
"valid": False,
"files_checked": files_checked,
"files_without_headers": files_with_issues,
"error_message": None,
}
else:
return {
"valid": True,
"files_checked": files_checked,
"files_without_headers": [],
"error_message": None,
}
except zipfile.BadZipFile as e:
return {"valid": False, "error_message": f"Bad zip file: {e}"}
except FileNotFoundError:
return {"valid": False, "error_message": "File not found"}
except Exception as e:
return {"valid": False, "error_message": f"Unexpected error: {e}"}
def _license_headers_check_should_check_zip(member_path: str, extension: str) -> bool:
"""Determine whether a file in a zip should be checked for license headers."""
for pattern in license.INCLUDED_PATTERNS:
if license.re.match(pattern, f".{extension}"):
# Also check whether we have a comment style defined for it
if license.COMMENT_STYLES.get(extension):
return True
else:
_LOGGER.warning(f"No comment style defined for included extension '{extension}' in {member_path}")
return False
return False
def _license_headers_check_single_file_zip(
zf: zipfile.ZipFile, member_info: zipfile.ZipInfo, extension: str
) -> tuple[bool, str | None]:
"""Check the license header of a single file within a zip. Returns (is_valid, error_message)."""
member_path = member_info.filename
try:
with zf.open(member_path) as file_in_zip:
content_bytes = file_in_zip.read(2048)
header_bytes = license.strip_comments(content_bytes, extension)
expected_header_bytes = license.APACHE_LICENSE_HEADER
if header_bytes == expected_header_bytes:
return True, None
else:
# Header mismatch
return False, None
except Exception as read_error:
return False, f"{member_path} (Read Error: {read_error})"
def _structure_check_core_logic(artifact_path: str) -> dict[str, Any]:
"""Verify the internal structure of the zip archive."""
try:
with zipfile.ZipFile(artifact_path, "r") as zf:
members = zf.namelist()
if not members:
return {"error": "Archive is empty"}
base_name = os.path.basename(artifact_path)
name_part = base_name.removesuffix(".zip")
# # TODO: Airavata has e.g. "-source-release"
# # It would be useful if there were a function in analysis.py for stripping these
# # But the root directory should probably always match the name of the file sans suffix
# # (This would also be easier to implement)
# if name_part.endswith(("-src", "-bin", "-dist")):
# name_part = "-".join(name_part.split("-")[:-1])
expected_root = name_part
root_dirs, non_rooted_files = _structure_check_core_logic_find_roots(zf, members)
actual_root, error_msg = _structure_check_core_logic_validate_root(
members, root_dirs, non_rooted_files, expected_root
)
if error_msg:
if error_msg.startswith("Root directory mismatch"):
return {"warning": error_msg}
else:
return {"error": error_msg}
if actual_root:
return {"root_dir": actual_root}
return {"error": "Unknown structure validation error"}
except zipfile.BadZipFile as e:
return {"error": f"Bad zip file: {e}"}
except FileNotFoundError:
return {"error": "File not found"}
except Exception as e:
return {"error": f"Unexpected error: {e}"}
def _structure_check_core_logic_find_roots(zf: zipfile.ZipFile, members: list[str]) -> tuple[set[str], list[str]]:
"""Identify root directories and non-rooted files in a zip archive."""
root_dirs: set[str] = set()
non_rooted_files: list[str] = []
for member in members:
if "/" in member:
root_dirs.add(member.split("/", 1)[0])
elif not zipfile.Path(zf, member).is_dir():
non_rooted_files.append(member)
return root_dirs, non_rooted_files
def _structure_check_core_logic_validate_root(
members: list[str], root_dirs: set[str], non_rooted_files: list[str], expected_root: str
) -> tuple[str | None, str | None]:
"""Validate the identified root structure against expectations."""
if non_rooted_files:
return None, f"Files found directly in root: {non_rooted_files}"
if not root_dirs:
return None, "No directories found in archive"
if len(root_dirs) > 1:
return None, f"Multiple root directories found: {sorted(list(root_dirs))}"
actual_root = next(iter(root_dirs))
if actual_root != expected_root:
return None, f"Root directory mismatch. Expected '{expected_root}', found '{actual_root}'"
# Check whether all members are under the correct root directory
for member in members:
if member == actual_root.rstrip("/"):
continue
if not member.startswith(expected_root):
return None, f"Member found outside expected root directory: {member}"
return actual_root, None