atr/db/models.py (294 lines of code) (raw):
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""The data models to be persisted in the database."""
# NOTE: We can't use symbolic annotations here because sqlmodel doesn't support them
# from __future__ import annotations
import datetime
import enum
from typing import Any, Optional
import pydantic
import sqlalchemy
import sqlalchemy.event as event
import sqlmodel
import atr.db as db
class UTCDateTime(sqlalchemy.types.TypeDecorator):
"""
A custom column type to store datetime in sqlite.
As sqlite does not have timezone support, we ensure that all datetimes stored
within sqlite are converted to UTC. When retrieved, the datetimes are constructed
as offset-aware datetime with UTC as their timezone.
"""
impl = sqlalchemy.types.TIMESTAMP(timezone=True)
cache_ok = True
def process_bind_param(self, value, dialect): # type: ignore
if value:
if not isinstance(value, datetime.datetime):
raise ValueError(f"unexpected value type {type(value)}")
if value.tzinfo is None:
raise ValueError("encountered offset-naive datetime")
# store the datetime in UTC in sqlite as it does not support timezones
return value.astimezone(datetime.UTC)
else:
return value
def process_result_value(self, value, dialect): # type: ignore
if isinstance(value, datetime.datetime):
return value.replace(tzinfo=datetime.UTC)
else:
return value
class UserRole(str, enum.Enum):
COMMITTEE_MEMBER = "committee_member"
RELEASE_MANAGER = "release_manager"
COMMITTER = "committer"
VISITOR = "visitor"
ASF_MEMBER = "asf_member"
SYSADMIN = "sysadmin"
class KeyLink(sqlmodel.SQLModel, table=True):
committee_name: str = sqlmodel.Field(foreign_key="committee.name", primary_key=True)
key_fingerprint: str = sqlmodel.Field(foreign_key="publicsigningkey.fingerprint", primary_key=True)
class PublicSigningKey(sqlmodel.SQLModel, table=True):
# The fingerprint must be stored as lowercase hex
fingerprint: str = sqlmodel.Field(primary_key=True, unique=True)
# The algorithm is an RFC 4880 algorithm ID
algorithm: int
# Key length in bits
length: int
# Creation date
created: datetime.datetime = sqlmodel.Field(sa_column=sqlalchemy.Column(UTCDateTime))
# Expiration date
expires: datetime.datetime | None = sqlmodel.Field(default=None, sa_column=sqlalchemy.Column(UTCDateTime))
# The UID declared in the key
declared_uid: str | None
# The UID used by Apache
apache_uid: str
# The ASCII armored key
ascii_armored_key: str
# The committees that use this key
committees: list["Committee"] = sqlmodel.Relationship(back_populates="public_signing_keys", link_model=KeyLink)
class ReleasePolicy(sqlmodel.SQLModel, table=True):
id: int = sqlmodel.Field(default=None, primary_key=True)
mailto_addresses: list[str] = sqlmodel.Field(default_factory=list, sa_column=sqlalchemy.Column(sqlalchemy.JSON))
manual_vote: bool = sqlmodel.Field(default=False)
min_hours: int = sqlmodel.Field(default=0)
release_checklist: str = sqlmodel.Field(default="")
pause_for_rm: bool = sqlmodel.Field(default=False)
start_vote_template: str = sqlmodel.Field(default="")
announce_release_template: str = sqlmodel.Field(default="")
# One-to-One: A release policy is associated with a project
project: "Project" = sqlmodel.Relationship(back_populates="release_policy")
class Committee(sqlmodel.SQLModel, table=True):
# TODO: Consider using key or label for primary string keys
# Then we can use simply "name" for full_name, and make it str rather than str | None
name: str = sqlmodel.Field(unique=True, primary_key=True)
full_name: str | None = sqlmodel.Field(default=None)
# True only if this is an incubator podling with a PPMC
is_podling: bool = sqlmodel.Field(default=False)
# One-to-many: A committee can have multiple child committees, each child committee belongs to one parent committee
child_committees: list["Committee"] = sqlmodel.Relationship(
sa_relationship_kwargs=dict(
backref=sqlalchemy.orm.backref("parent_committee", remote_side="Committee.name"),
),
)
parent_committee_name: str | None = sqlmodel.Field(default=None, foreign_key="committee.name")
# One-to-many: A committee can have multiple projects, each project belongs to one committee
projects: list["Project"] = sqlmodel.Relationship(back_populates="committee")
committee_members: list[str] = sqlmodel.Field(default_factory=list, sa_column=sqlalchemy.Column(sqlalchemy.JSON))
committers: list[str] = sqlmodel.Field(default_factory=list, sa_column=sqlalchemy.Column(sqlalchemy.JSON))
release_managers: list[str] = sqlmodel.Field(default_factory=list, sa_column=sqlalchemy.Column(sqlalchemy.JSON))
# Many-to-many: A committee can have multiple signing keys, and a signing key can belong to multiple committees
public_signing_keys: list[PublicSigningKey] = sqlmodel.Relationship(back_populates="committees", link_model=KeyLink)
@property
def display_name(self) -> str:
"""Get the display name for the committee."""
name = self.full_name or self.name.title()
return f"{name} (PPMC)" if self.is_podling else name
class Project(sqlmodel.SQLModel, table=True):
# TODO: Consider using key or label for primary string keys
# Then we can use simply "name" for full_name, and make it str rather than str | None
name: str = sqlmodel.Field(unique=True, primary_key=True)
# TODO: Ideally full_name would be unique for str only, but that's complex
# We always include "Apache" in the full_name
full_name: str | None = sqlmodel.Field(default=None)
# True if this a podling project
# TODO: We should have this on Committee too, or instead
is_podling: bool = sqlmodel.Field(default=False)
is_retired: bool = sqlmodel.Field(default=False)
super_project_name: str | None = sqlmodel.Field(default=None, foreign_key="project.name")
# NOTE: Neither "Project" | None nor "Project | None" works
super_project: Optional["Project"] = sqlmodel.Relationship()
description: str | None = sqlmodel.Field(default=None)
category: str | None = sqlmodel.Field(default=None)
programming_languages: str | None = sqlmodel.Field(default=None)
# Many-to-one: A project belongs to one committee, a committee can have multiple projects
committee_name: str | None = sqlmodel.Field(default=None, foreign_key="committee.name")
committee: Committee | None = sqlmodel.Relationship(back_populates="projects")
# One-to-many: A project can have multiple releases, each release belongs to one project
releases: list["Release"] = sqlmodel.Relationship(back_populates="project")
# One-to-many: A project can have multiple distribution channels, each channel belongs to one project
distribution_channels: list["DistributionChannel"] = sqlmodel.Relationship(back_populates="project")
# Many-to-one: A Project can have one release policy, a release policy can be used by multiple entities
release_policy_id: int | None = sqlmodel.Field(default=None, foreign_key="releasepolicy.id", ondelete="CASCADE")
release_policy: ReleasePolicy | None = sqlmodel.Relationship(
cascade_delete=True, sa_relationship_kwargs={"cascade": "all, delete-orphan", "single_parent": True}
)
created: datetime.datetime = sqlmodel.Field(
default_factory=lambda: datetime.datetime.now(datetime.UTC), sa_column=sqlalchemy.Column(UTCDateTime)
)
created_by: str | None = sqlmodel.Field(default=None)
@property
def display_name(self) -> str:
"""Get the display name for the Project."""
return self.full_name or self.name
@property
def short_display_name(self) -> str:
"""Get the short display name for the Project."""
return self.display_name.removeprefix("Apache ")
async def releases_by_phase(self, phase: "ReleasePhase") -> list["Release"]:
"""Get the releases for the project by phase."""
query = (
sqlmodel.select(Release)
.where(
Release.project_name == self.name,
Release.phase == phase,
)
.order_by(db.validate_instrumented_attribute(Release.created).desc())
)
results = []
async with db.session() as data:
for result in (await data.execute(query)).all():
release = result[0]
results.append(release)
for release in results:
# Don't need to eager load and lose it when the session closes
release.project = self
return results
@property
async def candidate_drafts(self) -> list["Release"]:
"""Get the candidate drafts for the project."""
return await self.releases_by_phase(ReleasePhase.RELEASE_CANDIDATE_DRAFT)
@property
async def candidates(self) -> list["Release"]:
"""Get the candidate releases for the project."""
return await self.releases_by_phase(ReleasePhase.RELEASE_CANDIDATE)
@property
async def previews(self) -> list["Release"]:
"""Get the preview releases for the project."""
return await self.releases_by_phase(ReleasePhase.RELEASE_PREVIEW)
@property
async def full_releases(self) -> list["Release"]:
"""Get the full releases for the project."""
return await self.releases_by_phase(ReleasePhase.RELEASE)
@property
async def releases_in_progress(self) -> list["Release"]:
"""Get the releases in progress for the project."""
drafts = await self.candidate_drafts
candidates = await self.candidates
previews = await self.previews
return drafts + candidates + previews
class DistributionChannel(sqlmodel.SQLModel, table=True):
id: int = sqlmodel.Field(default=None, primary_key=True)
name: str = sqlmodel.Field(index=True, unique=True)
url: str
credentials: str
is_test: bool = sqlmodel.Field(default=False)
automation_endpoint: str
# Many-to-one: A distribution channel belongs to one project, a project can have multiple channels
project_name: str = sqlmodel.Field(foreign_key="project.name")
project: Project = sqlmodel.Relationship(back_populates="distribution_channels")
class VoteEntry(pydantic.BaseModel):
result: bool
summary: str
binding_votes: int
community_votes: int
start: datetime.datetime
end: datetime.datetime
class ReleaseStage(str, enum.Enum):
# A release candidate is being prepared
RELEASE_CANDIDATE = "release_candidate"
# A release is being prepared
RELEASE = "release"
# An existing release is being imported from ASF SVN dist
MIGRATION = "migration"
# A release candidate has failed at any CANDIDATE stage
FAILED = "failed"
class ReleasePhase(str, enum.Enum):
# Step 1: The candidate files are added from external sources and checked by ATR
RELEASE_CANDIDATE_DRAFT = "release_candidate_draft"
# Step 2: The project members are voting on the candidate release
RELEASE_CANDIDATE = "release_candidate"
# Step 3: The release files are being put in place
RELEASE_PREVIEW = "release_preview"
# Step 4: The release has been announced
RELEASE = "release"
class TaskStatus(str, enum.Enum):
"""Status of a task in the task queue."""
QUEUED = "queued"
ACTIVE = "active"
COMPLETED = "completed"
FAILED = "failed"
class TaskType(str, enum.Enum):
HASHING_CHECK = "hashing_check"
LICENSE_FILES = "license_files"
LICENSE_HEADERS = "license_headers"
MESSAGE_SEND = "message_send"
PATHS_CHECK = "paths_check"
RAT_CHECK = "rat_check"
# RSYNC_ANALYSE = "rsync_analyse"
SBOM_GENERATE_CYCLONEDX = "sbom_generate_cyclonedx"
SIGNATURE_CHECK = "signature_check"
SVN_IMPORT_FILES = "svn_import_files"
TARGZ_INTEGRITY = "targz_integrity"
TARGZ_STRUCTURE = "targz_structure"
VOTE_INITIATE = "vote_initiate"
ZIPFORMAT_INTEGRITY = "zipformat_integrity"
ZIPFORMAT_LICENSE_FILES = "zipformat_license_files"
ZIPFORMAT_LICENSE_HEADERS = "zipformat_license_headers"
ZIPFORMAT_STRUCTURE = "zipformat_structure"
class Task(sqlmodel.SQLModel, table=True):
"""A task in the task queue."""
id: int = sqlmodel.Field(default=None, primary_key=True)
status: TaskStatus = sqlmodel.Field(default=TaskStatus.QUEUED, index=True)
task_type: TaskType
task_args: Any = sqlmodel.Field(sa_column=sqlalchemy.Column(sqlalchemy.JSON))
added: datetime.datetime = sqlmodel.Field(
default_factory=lambda: datetime.datetime.now(datetime.UTC),
sa_column=sqlalchemy.Column(UTCDateTime, index=True),
)
started: datetime.datetime | None = sqlmodel.Field(
default=None,
sa_column=sqlalchemy.Column(UTCDateTime),
)
pid: int | None = None
completed: datetime.datetime | None = sqlmodel.Field(
default=None,
sa_column=sqlalchemy.Column(UTCDateTime),
)
result: Any | None = sqlmodel.Field(default=None, sa_column=sqlalchemy.Column(sqlalchemy.JSON))
error: str | None = None
# Used for check tasks
# We don't put these in task_args because we want to query them efficiently
release_name: str | None = sqlmodel.Field(default=None, foreign_key="release.name")
release: Optional["Release"] = sqlmodel.Relationship(back_populates="tasks")
draft_revision: str | None = sqlmodel.Field(default=None, index=True)
primary_rel_path: str | None = sqlmodel.Field(default=None, index=True)
# Create an index on status and added for efficient task claiming
__table_args__ = (
sqlalchemy.Index("ix_task_status_added", "status", "added"),
# Ensure valid status transitions:
# - QUEUED can transition to ACTIVE
# - ACTIVE can transition to COMPLETED or FAILED
# - COMPLETED and FAILED are terminal states
sqlalchemy.CheckConstraint(
"""
(
-- Initial state is always valid
status = 'QUEUED'
-- QUEUED -> ACTIVE requires setting started time and pid
OR (status = 'ACTIVE' AND started IS NOT NULL AND pid IS NOT NULL)
-- ACTIVE -> COMPLETED requires setting completed time and result
OR (status = 'COMPLETED' AND completed IS NOT NULL AND result IS NOT NULL)
-- ACTIVE -> FAILED requires setting completed time and error (result optional)
OR (status = 'FAILED' AND completed IS NOT NULL AND error IS NOT NULL)
)
""",
name="valid_task_status_transitions",
),
)
class Release(sqlmodel.SQLModel, table=True):
# We guarantee that "{project.name}-{version}" is unique
# Therefore we can use that for the name
name: str = sqlmodel.Field(default="", primary_key=True, unique=True)
stage: ReleaseStage
phase: ReleasePhase
created: datetime.datetime = sqlmodel.Field(sa_column=sqlalchemy.Column(UTCDateTime))
released: datetime.datetime | None = sqlmodel.Field(default=None, sa_column=sqlalchemy.Column(UTCDateTime))
# Many-to-one: A release belongs to one project, a project can have multiple releases
project_name: str = sqlmodel.Field(foreign_key="project.name")
project: Project = sqlmodel.Relationship(back_populates="releases")
package_managers: list[str] = sqlmodel.Field(default_factory=list, sa_column=sqlalchemy.Column(sqlalchemy.JSON))
# TODO: Not all releases have a version
# We could either make this str | None, or we could require version to be set on packages only
# For example, Apache Airflow Providers do not have an overall version
# They have one version per package, i.e. per provider
version: str
revision: str | None = sqlmodel.Field(default=None, index=True)
sboms: list[str] = sqlmodel.Field(default_factory=list, sa_column=sqlalchemy.Column(sqlalchemy.JSON))
# Many-to-one: A release can have one release policy, a release policy can be used by multiple releases
release_policy_id: int | None = sqlmodel.Field(default=None, foreign_key="releasepolicy.id")
release_policy: ReleasePolicy | None = sqlmodel.Relationship(
cascade_delete=True, sa_relationship_kwargs={"cascade": "all, delete-orphan", "single_parent": True}
)
votes: list[VoteEntry] = sqlmodel.Field(default_factory=list, sa_column=sqlalchemy.Column(sqlalchemy.JSON))
vote_started: datetime.datetime | None = sqlmodel.Field(default=None, sa_column=sqlalchemy.Column(UTCDateTime))
vote_resolved: datetime.datetime | None = sqlmodel.Field(default=None, sa_column=sqlalchemy.Column(UTCDateTime))
# One-to-many: A release can have multiple tasks
tasks: list["Task"] = sqlmodel.Relationship(
back_populates="release", sa_relationship_kwargs={"cascade": "all, delete-orphan"}
)
# One-to-many: A release can have multiple check results
check_results: list["CheckResult"] = sqlmodel.Relationship(back_populates="release")
# The combination of project_name and version must be unique
__table_args__ = (sqlalchemy.UniqueConstraint("project_name", "version", name="unique_project_version"),)
@property
def committee(self) -> Committee | None:
"""Get the committee for the release."""
project = self.project
if project is None:
return None
return project.committee
@property
def short_display_name(self) -> str:
"""Get the short display name for the release."""
return f"{self.project.short_display_name} {self.version}"
@property
def unwrap_revision(self) -> str:
"""Get the revision for the release."""
if self.revision is None:
raise ValueError("Release has no revision")
return self.revision
class SSHKey(sqlmodel.SQLModel, table=True):
fingerprint: str = sqlmodel.Field(primary_key=True)
key: str
asf_uid: str
class CheckResultStatus(str, enum.Enum):
EXCEPTION = "exception"
FAILURE = "failure"
SUCCESS = "success"
WARNING = "warning"
class CheckResult(sqlmodel.SQLModel, table=True):
id: int = sqlmodel.Field(default=None, primary_key=True)
release_name: str = sqlmodel.Field(foreign_key="release.name")
release: Release = sqlmodel.Relationship(back_populates="check_results")
checker: str
primary_rel_path: str | None = sqlmodel.Field(default=None, index=True)
created: datetime.datetime = sqlmodel.Field(sa_column=sqlalchemy.Column(UTCDateTime))
status: CheckResultStatus
message: str
data: Any = sqlmodel.Field(sa_column=sqlalchemy.Column(sqlalchemy.JSON))
# Link to the draft revisions that these results are for
histories: list["CheckResultHistoryLink"] = sqlmodel.Relationship(back_populates="check_result")
class CheckResultHistoryLink(sqlmodel.SQLModel, table=True):
# Composite primary key, automatically handled by SQLModel
check_result_id: int = sqlmodel.Field(foreign_key="checkresult.id", primary_key=True)
draft_revision: str = sqlmodel.Field(index=True, primary_key=True)
check_result: CheckResult = sqlmodel.Relationship(back_populates="histories")
class TextValue(sqlmodel.SQLModel, table=True):
# Composite primary key, automatically handled by SQLModel
ns: str = sqlmodel.Field(primary_key=True, index=True)
key: str = sqlmodel.Field(primary_key=True, index=True)
value: str = sqlmodel.Field()
@event.listens_for(Release, "before_insert")
def check_release_name(_mapper: sqlalchemy.orm.Mapper, _connection: sqlalchemy.Connection, release: Release) -> None:
if release.name == "":
release.name = release_name(release.project.name, release.version)
def project_version(release_name: str) -> tuple[str, str]:
"""Return the project and version for a given release name."""
try:
project_name, version_name = release_name.rsplit("-", 1)
return (project_name, version_name)
except ValueError:
raise ValueError(f"Invalid release name: {release_name}")
def release_name(project_name: str, version_name: str) -> str:
"""Return the release name for a given project and version."""
return f"{project_name}-{version_name}"