atr/db/models.py (294 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. """The data models to be persisted in the database.""" # NOTE: We can't use symbolic annotations here because sqlmodel doesn't support them # from __future__ import annotations import datetime import enum from typing import Any, Optional import pydantic import sqlalchemy import sqlalchemy.event as event import sqlmodel import atr.db as db class UTCDateTime(sqlalchemy.types.TypeDecorator): """ A custom column type to store datetime in sqlite. As sqlite does not have timezone support, we ensure that all datetimes stored within sqlite are converted to UTC. When retrieved, the datetimes are constructed as offset-aware datetime with UTC as their timezone. """ impl = sqlalchemy.types.TIMESTAMP(timezone=True) cache_ok = True def process_bind_param(self, value, dialect): # type: ignore if value: if not isinstance(value, datetime.datetime): raise ValueError(f"unexpected value type {type(value)}") if value.tzinfo is None: raise ValueError("encountered offset-naive datetime") # store the datetime in UTC in sqlite as it does not support timezones return value.astimezone(datetime.UTC) else: return value def process_result_value(self, value, dialect): # type: ignore if isinstance(value, datetime.datetime): return value.replace(tzinfo=datetime.UTC) else: return value class UserRole(str, enum.Enum): COMMITTEE_MEMBER = "committee_member" RELEASE_MANAGER = "release_manager" COMMITTER = "committer" VISITOR = "visitor" ASF_MEMBER = "asf_member" SYSADMIN = "sysadmin" class KeyLink(sqlmodel.SQLModel, table=True): committee_name: str = sqlmodel.Field(foreign_key="committee.name", primary_key=True) key_fingerprint: str = sqlmodel.Field(foreign_key="publicsigningkey.fingerprint", primary_key=True) class PublicSigningKey(sqlmodel.SQLModel, table=True): # The fingerprint must be stored as lowercase hex fingerprint: str = sqlmodel.Field(primary_key=True, unique=True) # The algorithm is an RFC 4880 algorithm ID algorithm: int # Key length in bits length: int # Creation date created: datetime.datetime = sqlmodel.Field(sa_column=sqlalchemy.Column(UTCDateTime)) # Expiration date expires: datetime.datetime | None = sqlmodel.Field(default=None, sa_column=sqlalchemy.Column(UTCDateTime)) # The UID declared in the key declared_uid: str | None # The UID used by Apache apache_uid: str # The ASCII armored key ascii_armored_key: str # The committees that use this key committees: list["Committee"] = sqlmodel.Relationship(back_populates="public_signing_keys", link_model=KeyLink) class ReleasePolicy(sqlmodel.SQLModel, table=True): id: int = sqlmodel.Field(default=None, primary_key=True) mailto_addresses: list[str] = sqlmodel.Field(default_factory=list, sa_column=sqlalchemy.Column(sqlalchemy.JSON)) manual_vote: bool = sqlmodel.Field(default=False) min_hours: int = sqlmodel.Field(default=0) release_checklist: str = sqlmodel.Field(default="") pause_for_rm: bool = sqlmodel.Field(default=False) start_vote_template: str = sqlmodel.Field(default="") announce_release_template: str = sqlmodel.Field(default="") # One-to-One: A release policy is associated with a project project: "Project" = sqlmodel.Relationship(back_populates="release_policy") class Committee(sqlmodel.SQLModel, table=True): # TODO: Consider using key or label for primary string keys # Then we can use simply "name" for full_name, and make it str rather than str | None name: str = sqlmodel.Field(unique=True, primary_key=True) full_name: str | None = sqlmodel.Field(default=None) # True only if this is an incubator podling with a PPMC is_podling: bool = sqlmodel.Field(default=False) # One-to-many: A committee can have multiple child committees, each child committee belongs to one parent committee child_committees: list["Committee"] = sqlmodel.Relationship( sa_relationship_kwargs=dict( backref=sqlalchemy.orm.backref("parent_committee", remote_side="Committee.name"), ), ) parent_committee_name: str | None = sqlmodel.Field(default=None, foreign_key="committee.name") # One-to-many: A committee can have multiple projects, each project belongs to one committee projects: list["Project"] = sqlmodel.Relationship(back_populates="committee") committee_members: list[str] = sqlmodel.Field(default_factory=list, sa_column=sqlalchemy.Column(sqlalchemy.JSON)) committers: list[str] = sqlmodel.Field(default_factory=list, sa_column=sqlalchemy.Column(sqlalchemy.JSON)) release_managers: list[str] = sqlmodel.Field(default_factory=list, sa_column=sqlalchemy.Column(sqlalchemy.JSON)) # Many-to-many: A committee can have multiple signing keys, and a signing key can belong to multiple committees public_signing_keys: list[PublicSigningKey] = sqlmodel.Relationship(back_populates="committees", link_model=KeyLink) @property def display_name(self) -> str: """Get the display name for the committee.""" name = self.full_name or self.name.title() return f"{name} (PPMC)" if self.is_podling else name class Project(sqlmodel.SQLModel, table=True): # TODO: Consider using key or label for primary string keys # Then we can use simply "name" for full_name, and make it str rather than str | None name: str = sqlmodel.Field(unique=True, primary_key=True) # TODO: Ideally full_name would be unique for str only, but that's complex # We always include "Apache" in the full_name full_name: str | None = sqlmodel.Field(default=None) # True if this a podling project # TODO: We should have this on Committee too, or instead is_podling: bool = sqlmodel.Field(default=False) is_retired: bool = sqlmodel.Field(default=False) super_project_name: str | None = sqlmodel.Field(default=None, foreign_key="project.name") # NOTE: Neither "Project" | None nor "Project | None" works super_project: Optional["Project"] = sqlmodel.Relationship() description: str | None = sqlmodel.Field(default=None) category: str | None = sqlmodel.Field(default=None) programming_languages: str | None = sqlmodel.Field(default=None) # Many-to-one: A project belongs to one committee, a committee can have multiple projects committee_name: str | None = sqlmodel.Field(default=None, foreign_key="committee.name") committee: Committee | None = sqlmodel.Relationship(back_populates="projects") # One-to-many: A project can have multiple releases, each release belongs to one project releases: list["Release"] = sqlmodel.Relationship(back_populates="project") # One-to-many: A project can have multiple distribution channels, each channel belongs to one project distribution_channels: list["DistributionChannel"] = sqlmodel.Relationship(back_populates="project") # Many-to-one: A Project can have one release policy, a release policy can be used by multiple entities release_policy_id: int | None = sqlmodel.Field(default=None, foreign_key="releasepolicy.id", ondelete="CASCADE") release_policy: ReleasePolicy | None = sqlmodel.Relationship( cascade_delete=True, sa_relationship_kwargs={"cascade": "all, delete-orphan", "single_parent": True} ) created: datetime.datetime = sqlmodel.Field( default_factory=lambda: datetime.datetime.now(datetime.UTC), sa_column=sqlalchemy.Column(UTCDateTime) ) created_by: str | None = sqlmodel.Field(default=None) @property def display_name(self) -> str: """Get the display name for the Project.""" return self.full_name or self.name @property def short_display_name(self) -> str: """Get the short display name for the Project.""" return self.display_name.removeprefix("Apache ") async def releases_by_phase(self, phase: "ReleasePhase") -> list["Release"]: """Get the releases for the project by phase.""" query = ( sqlmodel.select(Release) .where( Release.project_name == self.name, Release.phase == phase, ) .order_by(db.validate_instrumented_attribute(Release.created).desc()) ) results = [] async with db.session() as data: for result in (await data.execute(query)).all(): release = result[0] results.append(release) for release in results: # Don't need to eager load and lose it when the session closes release.project = self return results @property async def candidate_drafts(self) -> list["Release"]: """Get the candidate drafts for the project.""" return await self.releases_by_phase(ReleasePhase.RELEASE_CANDIDATE_DRAFT) @property async def candidates(self) -> list["Release"]: """Get the candidate releases for the project.""" return await self.releases_by_phase(ReleasePhase.RELEASE_CANDIDATE) @property async def previews(self) -> list["Release"]: """Get the preview releases for the project.""" return await self.releases_by_phase(ReleasePhase.RELEASE_PREVIEW) @property async def full_releases(self) -> list["Release"]: """Get the full releases for the project.""" return await self.releases_by_phase(ReleasePhase.RELEASE) @property async def releases_in_progress(self) -> list["Release"]: """Get the releases in progress for the project.""" drafts = await self.candidate_drafts candidates = await self.candidates previews = await self.previews return drafts + candidates + previews class DistributionChannel(sqlmodel.SQLModel, table=True): id: int = sqlmodel.Field(default=None, primary_key=True) name: str = sqlmodel.Field(index=True, unique=True) url: str credentials: str is_test: bool = sqlmodel.Field(default=False) automation_endpoint: str # Many-to-one: A distribution channel belongs to one project, a project can have multiple channels project_name: str = sqlmodel.Field(foreign_key="project.name") project: Project = sqlmodel.Relationship(back_populates="distribution_channels") class VoteEntry(pydantic.BaseModel): result: bool summary: str binding_votes: int community_votes: int start: datetime.datetime end: datetime.datetime class ReleaseStage(str, enum.Enum): # A release candidate is being prepared RELEASE_CANDIDATE = "release_candidate" # A release is being prepared RELEASE = "release" # An existing release is being imported from ASF SVN dist MIGRATION = "migration" # A release candidate has failed at any CANDIDATE stage FAILED = "failed" class ReleasePhase(str, enum.Enum): # Step 1: The candidate files are added from external sources and checked by ATR RELEASE_CANDIDATE_DRAFT = "release_candidate_draft" # Step 2: The project members are voting on the candidate release RELEASE_CANDIDATE = "release_candidate" # Step 3: The release files are being put in place RELEASE_PREVIEW = "release_preview" # Step 4: The release has been announced RELEASE = "release" class TaskStatus(str, enum.Enum): """Status of a task in the task queue.""" QUEUED = "queued" ACTIVE = "active" COMPLETED = "completed" FAILED = "failed" class TaskType(str, enum.Enum): HASHING_CHECK = "hashing_check" LICENSE_FILES = "license_files" LICENSE_HEADERS = "license_headers" MESSAGE_SEND = "message_send" PATHS_CHECK = "paths_check" RAT_CHECK = "rat_check" # RSYNC_ANALYSE = "rsync_analyse" SBOM_GENERATE_CYCLONEDX = "sbom_generate_cyclonedx" SIGNATURE_CHECK = "signature_check" SVN_IMPORT_FILES = "svn_import_files" TARGZ_INTEGRITY = "targz_integrity" TARGZ_STRUCTURE = "targz_structure" VOTE_INITIATE = "vote_initiate" ZIPFORMAT_INTEGRITY = "zipformat_integrity" ZIPFORMAT_LICENSE_FILES = "zipformat_license_files" ZIPFORMAT_LICENSE_HEADERS = "zipformat_license_headers" ZIPFORMAT_STRUCTURE = "zipformat_structure" class Task(sqlmodel.SQLModel, table=True): """A task in the task queue.""" id: int = sqlmodel.Field(default=None, primary_key=True) status: TaskStatus = sqlmodel.Field(default=TaskStatus.QUEUED, index=True) task_type: TaskType task_args: Any = sqlmodel.Field(sa_column=sqlalchemy.Column(sqlalchemy.JSON)) added: datetime.datetime = sqlmodel.Field( default_factory=lambda: datetime.datetime.now(datetime.UTC), sa_column=sqlalchemy.Column(UTCDateTime, index=True), ) started: datetime.datetime | None = sqlmodel.Field( default=None, sa_column=sqlalchemy.Column(UTCDateTime), ) pid: int | None = None completed: datetime.datetime | None = sqlmodel.Field( default=None, sa_column=sqlalchemy.Column(UTCDateTime), ) result: Any | None = sqlmodel.Field(default=None, sa_column=sqlalchemy.Column(sqlalchemy.JSON)) error: str | None = None # Used for check tasks # We don't put these in task_args because we want to query them efficiently release_name: str | None = sqlmodel.Field(default=None, foreign_key="release.name") release: Optional["Release"] = sqlmodel.Relationship(back_populates="tasks") draft_revision: str | None = sqlmodel.Field(default=None, index=True) primary_rel_path: str | None = sqlmodel.Field(default=None, index=True) # Create an index on status and added for efficient task claiming __table_args__ = ( sqlalchemy.Index("ix_task_status_added", "status", "added"), # Ensure valid status transitions: # - QUEUED can transition to ACTIVE # - ACTIVE can transition to COMPLETED or FAILED # - COMPLETED and FAILED are terminal states sqlalchemy.CheckConstraint( """ ( -- Initial state is always valid status = 'QUEUED' -- QUEUED -> ACTIVE requires setting started time and pid OR (status = 'ACTIVE' AND started IS NOT NULL AND pid IS NOT NULL) -- ACTIVE -> COMPLETED requires setting completed time and result OR (status = 'COMPLETED' AND completed IS NOT NULL AND result IS NOT NULL) -- ACTIVE -> FAILED requires setting completed time and error (result optional) OR (status = 'FAILED' AND completed IS NOT NULL AND error IS NOT NULL) ) """, name="valid_task_status_transitions", ), ) class Release(sqlmodel.SQLModel, table=True): # We guarantee that "{project.name}-{version}" is unique # Therefore we can use that for the name name: str = sqlmodel.Field(default="", primary_key=True, unique=True) stage: ReleaseStage phase: ReleasePhase created: datetime.datetime = sqlmodel.Field(sa_column=sqlalchemy.Column(UTCDateTime)) released: datetime.datetime | None = sqlmodel.Field(default=None, sa_column=sqlalchemy.Column(UTCDateTime)) # Many-to-one: A release belongs to one project, a project can have multiple releases project_name: str = sqlmodel.Field(foreign_key="project.name") project: Project = sqlmodel.Relationship(back_populates="releases") package_managers: list[str] = sqlmodel.Field(default_factory=list, sa_column=sqlalchemy.Column(sqlalchemy.JSON)) # TODO: Not all releases have a version # We could either make this str | None, or we could require version to be set on packages only # For example, Apache Airflow Providers do not have an overall version # They have one version per package, i.e. per provider version: str revision: str | None = sqlmodel.Field(default=None, index=True) sboms: list[str] = sqlmodel.Field(default_factory=list, sa_column=sqlalchemy.Column(sqlalchemy.JSON)) # Many-to-one: A release can have one release policy, a release policy can be used by multiple releases release_policy_id: int | None = sqlmodel.Field(default=None, foreign_key="releasepolicy.id") release_policy: ReleasePolicy | None = sqlmodel.Relationship( cascade_delete=True, sa_relationship_kwargs={"cascade": "all, delete-orphan", "single_parent": True} ) votes: list[VoteEntry] = sqlmodel.Field(default_factory=list, sa_column=sqlalchemy.Column(sqlalchemy.JSON)) vote_started: datetime.datetime | None = sqlmodel.Field(default=None, sa_column=sqlalchemy.Column(UTCDateTime)) vote_resolved: datetime.datetime | None = sqlmodel.Field(default=None, sa_column=sqlalchemy.Column(UTCDateTime)) # One-to-many: A release can have multiple tasks tasks: list["Task"] = sqlmodel.Relationship( back_populates="release", sa_relationship_kwargs={"cascade": "all, delete-orphan"} ) # One-to-many: A release can have multiple check results check_results: list["CheckResult"] = sqlmodel.Relationship(back_populates="release") # The combination of project_name and version must be unique __table_args__ = (sqlalchemy.UniqueConstraint("project_name", "version", name="unique_project_version"),) @property def committee(self) -> Committee | None: """Get the committee for the release.""" project = self.project if project is None: return None return project.committee @property def short_display_name(self) -> str: """Get the short display name for the release.""" return f"{self.project.short_display_name} {self.version}" @property def unwrap_revision(self) -> str: """Get the revision for the release.""" if self.revision is None: raise ValueError("Release has no revision") return self.revision class SSHKey(sqlmodel.SQLModel, table=True): fingerprint: str = sqlmodel.Field(primary_key=True) key: str asf_uid: str class CheckResultStatus(str, enum.Enum): EXCEPTION = "exception" FAILURE = "failure" SUCCESS = "success" WARNING = "warning" class CheckResult(sqlmodel.SQLModel, table=True): id: int = sqlmodel.Field(default=None, primary_key=True) release_name: str = sqlmodel.Field(foreign_key="release.name") release: Release = sqlmodel.Relationship(back_populates="check_results") checker: str primary_rel_path: str | None = sqlmodel.Field(default=None, index=True) created: datetime.datetime = sqlmodel.Field(sa_column=sqlalchemy.Column(UTCDateTime)) status: CheckResultStatus message: str data: Any = sqlmodel.Field(sa_column=sqlalchemy.Column(sqlalchemy.JSON)) # Link to the draft revisions that these results are for histories: list["CheckResultHistoryLink"] = sqlmodel.Relationship(back_populates="check_result") class CheckResultHistoryLink(sqlmodel.SQLModel, table=True): # Composite primary key, automatically handled by SQLModel check_result_id: int = sqlmodel.Field(foreign_key="checkresult.id", primary_key=True) draft_revision: str = sqlmodel.Field(index=True, primary_key=True) check_result: CheckResult = sqlmodel.Relationship(back_populates="histories") class TextValue(sqlmodel.SQLModel, table=True): # Composite primary key, automatically handled by SQLModel ns: str = sqlmodel.Field(primary_key=True, index=True) key: str = sqlmodel.Field(primary_key=True, index=True) value: str = sqlmodel.Field() @event.listens_for(Release, "before_insert") def check_release_name(_mapper: sqlalchemy.orm.Mapper, _connection: sqlalchemy.Connection, release: Release) -> None: if release.name == "": release.name = release_name(release.project.name, release.version) def project_version(release_name: str) -> tuple[str, str]: """Return the project and version for a given release name.""" try: project_name, version_name = release_name.rsplit("-", 1) return (project_name, version_name) except ValueError: raise ValueError(f"Invalid release name: {release_name}") def release_name(project_name: str, version_name: str) -> str: """Return the release name for a given project and version.""" return f"{project_name}-{version_name}"