atr/tasks/__init__.py (131 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. import contextlib from collections.abc import Awaitable, Callable, Coroutine from typing import Any, Final import atr.db as db import atr.db.models as models import atr.tasks.checks.hashing as hashing import atr.tasks.checks.license as license import atr.tasks.checks.paths as paths import atr.tasks.checks.rat as rat import atr.tasks.checks.signature as signature import atr.tasks.checks.targz as targz import atr.tasks.checks.zipformat as zipformat import atr.tasks.message as message import atr.tasks.sbom as sbom import atr.tasks.svn as svn import atr.tasks.vote as vote import atr.util as util async def asc_checks(release: models.Release, draft_revision: str, signature_path: str) -> list[models.Task]: """Create signature check task for a .asc file.""" tasks = [] if release.committee: tasks.append( queued( models.TaskType.SIGNATURE_CHECK, release, draft_revision, signature_path, {"committee_name": release.committee.name}, ) ) return tasks async def draft_checks( project_name: str, release_version: str, draft_revision: str, caller_data: db.Session | None = None ) -> int: """Core logic to analyse a draft revision and queue checks.""" # Construct path to the specific revision # We don't have the release object here, so we can't use util.release_directory revision_path = util.get_unfinished_dir() / project_name / release_version / draft_revision relative_paths = [path async for path in util.paths_recursive(revision_path)] session_context = db.session() if (caller_data is None) else contextlib.nullcontext(caller_data) async with session_context as data: release = await data.release(name=models.release_name(project_name, release_version), _committee=True).demand( RuntimeError("Release not found") ) for path in relative_paths: path_str = str(path) task_function: Callable[[models.Release, str, str], Awaitable[list[models.Task]]] | None = None for suffix, task_function in TASK_FUNCTIONS.items(): if path.name.endswith(suffix): for task in await task_function(release, draft_revision, path_str): task.draft_revision = draft_revision data.add(task) path_check_task = queued(models.TaskType.PATHS_CHECK, release, draft_revision) data.add(path_check_task) if caller_data is None: await data.commit() return len(relative_paths) def queued( task_type: models.TaskType, release: models.Release, draft_revision: str, primary_rel_path: str | None = None, extra_args: dict[str, Any] | None = None, ) -> models.Task: return models.Task( status=models.TaskStatus.QUEUED, task_type=task_type, task_args=extra_args or {}, release_name=release.name, draft_revision=draft_revision, primary_rel_path=primary_rel_path, ) def resolve(task_type: models.TaskType) -> Callable[..., Awaitable[str | None]]: # noqa: C901 match task_type: case models.TaskType.HASHING_CHECK: return hashing.check case models.TaskType.LICENSE_FILES: return license.files case models.TaskType.LICENSE_HEADERS: return license.headers case models.TaskType.MESSAGE_SEND: return message.send case models.TaskType.PATHS_CHECK: return paths.check case models.TaskType.RAT_CHECK: return rat.check # case models.TaskType.RSYNC_ANALYSE: # return rsync.analyse case models.TaskType.SBOM_GENERATE_CYCLONEDX: return sbom.generate_cyclonedx case models.TaskType.SIGNATURE_CHECK: return signature.check case models.TaskType.SVN_IMPORT_FILES: return svn.import_files case models.TaskType.TARGZ_INTEGRITY: return targz.integrity case models.TaskType.TARGZ_STRUCTURE: return targz.structure case models.TaskType.VOTE_INITIATE: return vote.initiate case models.TaskType.ZIPFORMAT_INTEGRITY: return zipformat.integrity case models.TaskType.ZIPFORMAT_STRUCTURE: return zipformat.structure case models.TaskType.ZIPFORMAT_LICENSE_FILES: return zipformat.license_files case models.TaskType.ZIPFORMAT_LICENSE_HEADERS: return zipformat.license_headers # NOTE: Do NOT add "case _" here # Otherwise we lose exhaustiveness checking async def sha_checks(release: models.Release, draft_revision: str, hash_file: str) -> list[models.Task]: """Create hash check task for a .sha256 or .sha512 file.""" tasks = [] tasks.append(queued(models.TaskType.HASHING_CHECK, release, draft_revision, hash_file)) return tasks async def tar_gz_checks(release: models.Release, draft_revision: str, path: str) -> list[models.Task]: """Create check tasks for a .tar.gz or .tgz file.""" tasks = [ queued(models.TaskType.LICENSE_FILES, release, draft_revision, path), queued(models.TaskType.LICENSE_HEADERS, release, draft_revision, path), queued(models.TaskType.RAT_CHECK, release, draft_revision, path), queued(models.TaskType.TARGZ_INTEGRITY, release, draft_revision, path), queued(models.TaskType.TARGZ_STRUCTURE, release, draft_revision, path), ] return tasks async def zip_checks(release: models.Release, draft_revision: str, path: str) -> list[models.Task]: """Create check tasks for a .zip file.""" tasks = [ queued(models.TaskType.ZIPFORMAT_INTEGRITY, release, draft_revision, path), queued(models.TaskType.ZIPFORMAT_LICENSE_FILES, release, draft_revision, path), queued(models.TaskType.ZIPFORMAT_LICENSE_HEADERS, release, draft_revision, path), queued(models.TaskType.ZIPFORMAT_STRUCTURE, release, draft_revision, path), ] return tasks TASK_FUNCTIONS: Final[dict[str, Callable[..., Coroutine[Any, Any, list[models.Task]]]]] = { ".asc": asc_checks, ".sha256": sha_checks, ".sha512": sha_checks, ".tar.gz": tar_gz_checks, ".tgz": tar_gz_checks, ".zip": zip_checks, }