atr/tasks/checks/paths.py (121 lines of code) (raw):
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import logging
import pathlib
import re
from typing import Final
import aiofiles.os
import atr.analysis as analysis
import atr.tasks.checks as checks
import atr.util as util
_LOGGER: Final = logging.getLogger(__name__)
async def check(args: checks.FunctionArguments) -> None:
"""Check file path structure and naming conventions against ASF release policy for all files in a release."""
# We refer to the following authoritative policies:
# - Release Creation Process (RCP)
# - Release Distribution Policy (RDP)
recorder_errors = await checks.Recorder.create(
checker=checks.function_key(check) + "_errors",
release_name=args.release_name,
draft_revision=args.draft_revision,
primary_rel_path=None,
afresh=True,
)
recorder_warnings = await checks.Recorder.create(
checker=checks.function_key(check) + "_warnings",
release_name=args.release_name,
draft_revision=args.draft_revision,
primary_rel_path=None,
afresh=True,
)
recorder_success = await checks.Recorder.create(
checker=checks.function_key(check) + "_success",
release_name=args.release_name,
draft_revision=args.draft_revision,
primary_rel_path=None,
afresh=True,
)
# As primary_rel_path is None, the base path is the release candidate draft directory
if not (base_path := await recorder_success.abs_path()):
return
if not await aiofiles.os.path.isdir(base_path):
_LOGGER.error("Base release directory does not exist or is not a directory: %s", base_path)
return
relative_paths = [p async for p in util.paths_recursive(base_path)]
relative_paths_set = set(str(p) for p in relative_paths)
for relative_path in relative_paths:
# Delegate processing of each path to the helper function
await _check_path_process_single(
base_path,
relative_path,
recorder_errors,
recorder_warnings,
recorder_success,
relative_paths_set,
)
return None
async def _check_artifact_rules(
base_path: pathlib.Path, relative_path: pathlib.Path, relative_paths: set[str], errors: list[str]
) -> None:
"""Check rules specific to artifact files."""
full_path = base_path / relative_path
# RDP says that .asc is required
asc_path = full_path.with_suffix(full_path.suffix + ".asc")
if not await aiofiles.os.path.exists(asc_path):
errors.append(f"Missing corresponding signature file ({relative_path}.asc)")
# RDP requires one of .sha256 or .sha512
relative_sha256_path = relative_path.with_suffix(relative_path.suffix + ".sha256")
relative_sha512_path = relative_path.with_suffix(relative_path.suffix + ".sha512")
has_sha256 = str(relative_sha256_path) in relative_paths
has_sha512 = str(relative_sha512_path) in relative_paths
if not (has_sha256 or has_sha512):
errors.append(f"Missing corresponding checksum file ({relative_path}.sha256 or {relative_path}.sha512)")
async def _check_metadata_rules(
_base_path: pathlib.Path,
relative_path: pathlib.Path,
relative_paths: set[str],
ext_metadata: str,
errors: list[str],
warnings: list[str],
) -> None:
"""Check rules specific to metadata files (.asc, .sha*, etc.)."""
suffixes = set(relative_path.suffixes)
if ".md5" in suffixes:
# Forbidden by RCP, deprecated by RDP
errors.append("The use of .md5 is forbidden, please use .sha512")
if ".sha1" in suffixes:
# Deprecated by RDP
warnings.append("The use of .sha1 is deprecated, please use .sha512")
if ".sha" in suffixes:
# Discouraged by RDP
warnings.append("The use of .sha is discouraged, please use .sha512")
if ".sig" in suffixes:
# Forbidden by RCP, forbidden by RDP
errors.append("Binary signature files (.sig) are forbidden, please use .asc")
# "Signature and checksum files for verifying distributed artifacts should
# not be provided, unless named as indicated above." (RDP)
# Also .mds is allowed, but we'll ignore that for now
# TODO: Is .mds supported in analysis.METADATA_SUFFIXES?
if ext_metadata not in {".asc", ".sha256", ".sha512", ".md5", ".sha", ".sha1"}:
warnings.append("The use of this metadata file is discouraged")
# Check whether the corresponding artifact exists
artifact_path_base = str(relative_path).removesuffix(ext_metadata)
if artifact_path_base not in relative_paths:
errors.append(f"Metadata file exists but corresponding artifact '{artifact_path_base}' is missing")
async def _check_path_process_single(
base_path: pathlib.Path,
relative_path: pathlib.Path,
recorder_errors: checks.Recorder,
recorder_warnings: checks.Recorder,
recorder_success: checks.Recorder,
relative_paths: set[str],
) -> None:
"""Process and check a single path within the release directory."""
full_path = base_path / relative_path
relative_path_str = str(relative_path)
errors: list[str] = []
warnings: list[str] = []
# The Release Distribution Policy specifically allows README and CHANGES, etc.
# We assume that LICENSE and NOTICE are permitted also
if relative_path.name == "KEYS":
errors.append("The KEYS file should be uploaded via the 'Keys' section, not included in the artifact bundle")
if any(part.startswith(".") for part in relative_path.parts):
# TODO: There is not a a policy for this
# We should enquire as to whether such a policy should be instituted
# We're forbidding dotfiles to catch accidental uploads of e.g. .git or .htaccess
# Such cases are likely to be in error, and could carry security risks
errors.append("Dotfiles are forbidden")
search = re.search(analysis.extension_pattern(), relative_path_str)
ext_artifact = search.group("artifact") if search else None
ext_metadata = search.group("metadata") if search else None
if ext_artifact:
_LOGGER.info("Checking artifact rules for %s", full_path)
await _check_artifact_rules(base_path, relative_path, relative_paths, errors)
elif ext_metadata:
_LOGGER.info("Checking metadata rules for %s", full_path)
await _check_metadata_rules(base_path, relative_path, relative_paths, ext_metadata, errors, warnings)
else:
_LOGGER.info("Checking general rules for %s", full_path)
allowed_top_level = {"LICENSE", "NOTICE", "README", "CHANGES"}
if (relative_path.parent == pathlib.Path(".")) and (relative_path.name not in allowed_top_level):
warnings.append(f"Unknown top level file: {relative_path.name}")
# Must aggregate errors and aggregate warnings otherwise they will be removed by afresh=True
# Alternatively we could call Check.clear() manually
if errors:
await recorder_errors.failure("; ".join(errors), {"errors": errors}, primary_rel_path=relative_path_str)
if warnings:
await recorder_warnings.warning("; ".join(warnings), {"warnings": warnings}, primary_rel_path=relative_path_str)
if not (errors or warnings):
await recorder_success.success(
"Path structure and naming conventions conform to policy", {}, primary_rel_path=relative_path_str
)