# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

"""The data models to be persisted in the database."""

# NOTE: We can't use symbolic annotations here because sqlmodel doesn't support them
# from __future__ import annotations

import datetime
import enum
from typing import Any, Optional

import pydantic
import sqlalchemy
import sqlalchemy.event as event
import sqlmodel

import atr.db as db


class UTCDateTime(sqlalchemy.types.TypeDecorator):
    """
    A custom column type to store datetime in sqlite.

    As sqlite does not have timezone support, we ensure that all datetimes stored
    within sqlite are converted to UTC. When retrieved, the datetimes are constructed
    as offset-aware datetime with UTC as their timezone.
    """

    impl = sqlalchemy.types.TIMESTAMP(timezone=True)

    cache_ok = True

    def process_bind_param(self, value, dialect):  # type: ignore
        if value:
            if not isinstance(value, datetime.datetime):
                raise ValueError(f"unexpected value type {type(value)}")

            if value.tzinfo is None:
                raise ValueError("encountered offset-naive datetime")

            # store the datetime in UTC in sqlite as it does not support timezones
            return value.astimezone(datetime.UTC)
        else:
            return value

    def process_result_value(self, value, dialect):  # type: ignore
        if isinstance(value, datetime.datetime):
            return value.replace(tzinfo=datetime.UTC)
        else:
            return value


class UserRole(str, enum.Enum):
    COMMITTEE_MEMBER = "committee_member"
    RELEASE_MANAGER = "release_manager"
    COMMITTER = "committer"
    VISITOR = "visitor"
    ASF_MEMBER = "asf_member"
    SYSADMIN = "sysadmin"


class KeyLink(sqlmodel.SQLModel, table=True):
    committee_name: str = sqlmodel.Field(foreign_key="committee.name", primary_key=True)
    key_fingerprint: str = sqlmodel.Field(foreign_key="publicsigningkey.fingerprint", primary_key=True)


class PublicSigningKey(sqlmodel.SQLModel, table=True):
    # The fingerprint must be stored as lowercase hex
    fingerprint: str = sqlmodel.Field(primary_key=True, unique=True)
    # The algorithm is an RFC 4880 algorithm ID
    algorithm: int
    # Key length in bits
    length: int
    # Creation date
    created: datetime.datetime = sqlmodel.Field(sa_column=sqlalchemy.Column(UTCDateTime))
    # Expiration date
    expires: datetime.datetime | None = sqlmodel.Field(default=None, sa_column=sqlalchemy.Column(UTCDateTime))
    # The UID declared in the key
    declared_uid: str | None
    # The UID used by Apache
    apache_uid: str
    # The ASCII armored key
    ascii_armored_key: str
    # The committees that use this key
    committees: list["Committee"] = sqlmodel.Relationship(back_populates="public_signing_keys", link_model=KeyLink)


class ReleasePolicy(sqlmodel.SQLModel, table=True):
    id: int = sqlmodel.Field(default=None, primary_key=True)
    mailto_addresses: list[str] = sqlmodel.Field(default_factory=list, sa_column=sqlalchemy.Column(sqlalchemy.JSON))
    manual_vote: bool = sqlmodel.Field(default=False)
    min_hours: int = sqlmodel.Field(default=0)
    release_checklist: str = sqlmodel.Field(default="")
    pause_for_rm: bool = sqlmodel.Field(default=False)
    start_vote_template: str = sqlmodel.Field(default="")
    announce_release_template: str = sqlmodel.Field(default="")

    # One-to-One: A release policy is associated with a project
    project: "Project" = sqlmodel.Relationship(back_populates="release_policy")


class Committee(sqlmodel.SQLModel, table=True):
    # TODO: Consider using key or label for primary string keys
    # Then we can use simply "name" for full_name, and make it str rather than str | None
    name: str = sqlmodel.Field(unique=True, primary_key=True)
    full_name: str | None = sqlmodel.Field(default=None)
    # True only if this is an incubator podling with a PPMC
    is_podling: bool = sqlmodel.Field(default=False)

    # One-to-many: A committee can have multiple child committees, each child committee belongs to one parent committee
    child_committees: list["Committee"] = sqlmodel.Relationship(
        sa_relationship_kwargs=dict(
            backref=sqlalchemy.orm.backref("parent_committee", remote_side="Committee.name"),
        ),
    )
    parent_committee_name: str | None = sqlmodel.Field(default=None, foreign_key="committee.name")
    # One-to-many: A committee can have multiple projects, each project belongs to one committee
    projects: list["Project"] = sqlmodel.Relationship(back_populates="committee")

    committee_members: list[str] = sqlmodel.Field(default_factory=list, sa_column=sqlalchemy.Column(sqlalchemy.JSON))
    committers: list[str] = sqlmodel.Field(default_factory=list, sa_column=sqlalchemy.Column(sqlalchemy.JSON))
    release_managers: list[str] = sqlmodel.Field(default_factory=list, sa_column=sqlalchemy.Column(sqlalchemy.JSON))

    # Many-to-many: A committee can have multiple signing keys, and a signing key can belong to multiple committees
    public_signing_keys: list[PublicSigningKey] = sqlmodel.Relationship(back_populates="committees", link_model=KeyLink)

    @property
    def display_name(self) -> str:
        """Get the display name for the committee."""
        name = self.full_name or self.name.title()
        return f"{name} (PPMC)" if self.is_podling else name


class Project(sqlmodel.SQLModel, table=True):
    # TODO: Consider using key or label for primary string keys
    # Then we can use simply "name" for full_name, and make it str rather than str | None
    name: str = sqlmodel.Field(unique=True, primary_key=True)
    # TODO: Ideally full_name would be unique for str only, but that's complex
    # We always include "Apache" in the full_name
    full_name: str | None = sqlmodel.Field(default=None)

    # True if this a podling project
    # TODO: We should have this on Committee too, or instead
    is_podling: bool = sqlmodel.Field(default=False)
    is_retired: bool = sqlmodel.Field(default=False)

    super_project_name: str | None = sqlmodel.Field(default=None, foreign_key="project.name")
    # NOTE: Neither "Project" | None nor "Project | None" works
    super_project: Optional["Project"] = sqlmodel.Relationship()

    description: str | None = sqlmodel.Field(default=None)
    category: str | None = sqlmodel.Field(default=None)
    programming_languages: str | None = sqlmodel.Field(default=None)

    # Many-to-one: A project belongs to one committee, a committee can have multiple projects
    committee_name: str | None = sqlmodel.Field(default=None, foreign_key="committee.name")
    committee: Committee | None = sqlmodel.Relationship(back_populates="projects")

    # One-to-many: A project can have multiple releases, each release belongs to one project
    releases: list["Release"] = sqlmodel.Relationship(back_populates="project")

    # One-to-many: A project can have multiple distribution channels, each channel belongs to one project
    distribution_channels: list["DistributionChannel"] = sqlmodel.Relationship(back_populates="project")

    # Many-to-one: A Project can have one release policy, a release policy can be used by multiple entities
    release_policy_id: int | None = sqlmodel.Field(default=None, foreign_key="releasepolicy.id", ondelete="CASCADE")
    release_policy: ReleasePolicy | None = sqlmodel.Relationship(
        cascade_delete=True, sa_relationship_kwargs={"cascade": "all, delete-orphan", "single_parent": True}
    )

    created: datetime.datetime = sqlmodel.Field(
        default_factory=lambda: datetime.datetime.now(datetime.UTC), sa_column=sqlalchemy.Column(UTCDateTime)
    )
    created_by: str | None = sqlmodel.Field(default=None)

    @property
    def display_name(self) -> str:
        """Get the display name for the Project."""
        return self.full_name or self.name

    @property
    def short_display_name(self) -> str:
        """Get the short display name for the Project."""
        return self.display_name.removeprefix("Apache ")

    async def releases_by_phase(self, phase: "ReleasePhase") -> list["Release"]:
        """Get the releases for the project by phase."""
        query = (
            sqlmodel.select(Release)
            .where(
                Release.project_name == self.name,
                Release.phase == phase,
            )
            .order_by(db.validate_instrumented_attribute(Release.created).desc())
        )

        results = []
        async with db.session() as data:
            for result in (await data.execute(query)).all():
                release = result[0]
                results.append(release)
        for release in results:
            # Don't need to eager load and lose it when the session closes
            release.project = self
        return results

    @property
    async def candidate_drafts(self) -> list["Release"]:
        """Get the candidate drafts for the project."""
        return await self.releases_by_phase(ReleasePhase.RELEASE_CANDIDATE_DRAFT)

    @property
    async def candidates(self) -> list["Release"]:
        """Get the candidate releases for the project."""
        return await self.releases_by_phase(ReleasePhase.RELEASE_CANDIDATE)

    @property
    async def previews(self) -> list["Release"]:
        """Get the preview releases for the project."""
        return await self.releases_by_phase(ReleasePhase.RELEASE_PREVIEW)

    @property
    async def full_releases(self) -> list["Release"]:
        """Get the full releases for the project."""
        return await self.releases_by_phase(ReleasePhase.RELEASE)

    @property
    async def releases_in_progress(self) -> list["Release"]:
        """Get the releases in progress for the project."""
        drafts = await self.candidate_drafts
        candidates = await self.candidates
        previews = await self.previews
        return drafts + candidates + previews


class DistributionChannel(sqlmodel.SQLModel, table=True):
    id: int = sqlmodel.Field(default=None, primary_key=True)
    name: str = sqlmodel.Field(index=True, unique=True)
    url: str
    credentials: str
    is_test: bool = sqlmodel.Field(default=False)
    automation_endpoint: str

    # Many-to-one: A distribution channel belongs to one project, a project can have multiple channels
    project_name: str = sqlmodel.Field(foreign_key="project.name")
    project: Project = sqlmodel.Relationship(back_populates="distribution_channels")


class VoteEntry(pydantic.BaseModel):
    result: bool
    summary: str
    binding_votes: int
    community_votes: int
    start: datetime.datetime
    end: datetime.datetime


class ReleaseStage(str, enum.Enum):
    # A release candidate is being prepared
    RELEASE_CANDIDATE = "release_candidate"
    # A release is being prepared
    RELEASE = "release"
    # An existing release is being imported from ASF SVN dist
    MIGRATION = "migration"
    # A release candidate has failed at any CANDIDATE stage
    FAILED = "failed"


class ReleasePhase(str, enum.Enum):
    # Step 1: The candidate files are added from external sources and checked by ATR
    RELEASE_CANDIDATE_DRAFT = "release_candidate_draft"
    # Step 2: The project members are voting on the candidate release
    RELEASE_CANDIDATE = "release_candidate"
    # Step 3: The release files are being put in place
    RELEASE_PREVIEW = "release_preview"
    # Step 4: The release has been announced
    RELEASE = "release"


class TaskStatus(str, enum.Enum):
    """Status of a task in the task queue."""

    QUEUED = "queued"
    ACTIVE = "active"
    COMPLETED = "completed"
    FAILED = "failed"


class TaskType(str, enum.Enum):
    HASHING_CHECK = "hashing_check"
    LICENSE_FILES = "license_files"
    LICENSE_HEADERS = "license_headers"
    MESSAGE_SEND = "message_send"
    PATHS_CHECK = "paths_check"
    RAT_CHECK = "rat_check"
    # RSYNC_ANALYSE = "rsync_analyse"
    SBOM_GENERATE_CYCLONEDX = "sbom_generate_cyclonedx"
    SIGNATURE_CHECK = "signature_check"
    SVN_IMPORT_FILES = "svn_import_files"
    TARGZ_INTEGRITY = "targz_integrity"
    TARGZ_STRUCTURE = "targz_structure"
    VOTE_INITIATE = "vote_initiate"
    ZIPFORMAT_INTEGRITY = "zipformat_integrity"
    ZIPFORMAT_LICENSE_FILES = "zipformat_license_files"
    ZIPFORMAT_LICENSE_HEADERS = "zipformat_license_headers"
    ZIPFORMAT_STRUCTURE = "zipformat_structure"


class Task(sqlmodel.SQLModel, table=True):
    """A task in the task queue."""

    id: int = sqlmodel.Field(default=None, primary_key=True)
    status: TaskStatus = sqlmodel.Field(default=TaskStatus.QUEUED, index=True)
    task_type: TaskType
    task_args: Any = sqlmodel.Field(sa_column=sqlalchemy.Column(sqlalchemy.JSON))
    added: datetime.datetime = sqlmodel.Field(
        default_factory=lambda: datetime.datetime.now(datetime.UTC),
        sa_column=sqlalchemy.Column(UTCDateTime, index=True),
    )
    started: datetime.datetime | None = sqlmodel.Field(
        default=None,
        sa_column=sqlalchemy.Column(UTCDateTime),
    )
    pid: int | None = None
    completed: datetime.datetime | None = sqlmodel.Field(
        default=None,
        sa_column=sqlalchemy.Column(UTCDateTime),
    )
    result: Any | None = sqlmodel.Field(default=None, sa_column=sqlalchemy.Column(sqlalchemy.JSON))
    error: str | None = None

    # Used for check tasks
    # We don't put these in task_args because we want to query them efficiently
    release_name: str | None = sqlmodel.Field(default=None, foreign_key="release.name")
    release: Optional["Release"] = sqlmodel.Relationship(back_populates="tasks")
    draft_revision: str | None = sqlmodel.Field(default=None, index=True)
    primary_rel_path: str | None = sqlmodel.Field(default=None, index=True)

    # Create an index on status and added for efficient task claiming
    __table_args__ = (
        sqlalchemy.Index("ix_task_status_added", "status", "added"),
        # Ensure valid status transitions:
        # - QUEUED can transition to ACTIVE
        # - ACTIVE can transition to COMPLETED or FAILED
        # - COMPLETED and FAILED are terminal states
        sqlalchemy.CheckConstraint(
            """
            (
                -- Initial state is always valid
                status = 'QUEUED'
                -- QUEUED -> ACTIVE requires setting started time and pid
                OR (status = 'ACTIVE' AND started IS NOT NULL AND pid IS NOT NULL)
                -- ACTIVE -> COMPLETED requires setting completed time and result
                OR (status = 'COMPLETED' AND completed IS NOT NULL AND result IS NOT NULL)
                -- ACTIVE -> FAILED requires setting completed time and error (result optional)
                OR (status = 'FAILED' AND completed IS NOT NULL AND error IS NOT NULL)
            )
            """,
            name="valid_task_status_transitions",
        ),
    )


class Release(sqlmodel.SQLModel, table=True):
    # We guarantee that "{project.name}-{version}" is unique
    # Therefore we can use that for the name
    name: str = sqlmodel.Field(default="", primary_key=True, unique=True)
    stage: ReleaseStage
    phase: ReleasePhase
    created: datetime.datetime = sqlmodel.Field(sa_column=sqlalchemy.Column(UTCDateTime))
    released: datetime.datetime | None = sqlmodel.Field(default=None, sa_column=sqlalchemy.Column(UTCDateTime))

    # Many-to-one: A release belongs to one project, a project can have multiple releases
    project_name: str = sqlmodel.Field(foreign_key="project.name")
    project: Project = sqlmodel.Relationship(back_populates="releases")

    package_managers: list[str] = sqlmodel.Field(default_factory=list, sa_column=sqlalchemy.Column(sqlalchemy.JSON))
    # TODO: Not all releases have a version
    # We could either make this str | None, or we could require version to be set on packages only
    # For example, Apache Airflow Providers do not have an overall version
    # They have one version per package, i.e. per provider
    version: str
    revision: str | None = sqlmodel.Field(default=None, index=True)
    sboms: list[str] = sqlmodel.Field(default_factory=list, sa_column=sqlalchemy.Column(sqlalchemy.JSON))

    # Many-to-one: A release can have one release policy, a release policy can be used by multiple releases
    release_policy_id: int | None = sqlmodel.Field(default=None, foreign_key="releasepolicy.id")
    release_policy: ReleasePolicy | None = sqlmodel.Relationship(
        cascade_delete=True, sa_relationship_kwargs={"cascade": "all, delete-orphan", "single_parent": True}
    )

    votes: list[VoteEntry] = sqlmodel.Field(default_factory=list, sa_column=sqlalchemy.Column(sqlalchemy.JSON))

    vote_started: datetime.datetime | None = sqlmodel.Field(default=None, sa_column=sqlalchemy.Column(UTCDateTime))
    vote_resolved: datetime.datetime | None = sqlmodel.Field(default=None, sa_column=sqlalchemy.Column(UTCDateTime))

    # One-to-many: A release can have multiple tasks
    tasks: list["Task"] = sqlmodel.Relationship(
        back_populates="release", sa_relationship_kwargs={"cascade": "all, delete-orphan"}
    )

    # One-to-many: A release can have multiple check results
    check_results: list["CheckResult"] = sqlmodel.Relationship(back_populates="release")

    # The combination of project_name and version must be unique
    __table_args__ = (sqlalchemy.UniqueConstraint("project_name", "version", name="unique_project_version"),)

    @property
    def committee(self) -> Committee | None:
        """Get the committee for the release."""
        project = self.project
        if project is None:
            return None
        return project.committee

    @property
    def short_display_name(self) -> str:
        """Get the short display name for the release."""
        return f"{self.project.short_display_name} {self.version}"

    @property
    def unwrap_revision(self) -> str:
        """Get the revision for the release."""
        if self.revision is None:
            raise ValueError("Release has no revision")
        return self.revision


class SSHKey(sqlmodel.SQLModel, table=True):
    fingerprint: str = sqlmodel.Field(primary_key=True)
    key: str
    asf_uid: str


class CheckResultStatus(str, enum.Enum):
    EXCEPTION = "exception"
    FAILURE = "failure"
    SUCCESS = "success"
    WARNING = "warning"


class CheckResult(sqlmodel.SQLModel, table=True):
    id: int = sqlmodel.Field(default=None, primary_key=True)
    release_name: str = sqlmodel.Field(foreign_key="release.name")
    release: Release = sqlmodel.Relationship(back_populates="check_results")
    checker: str
    primary_rel_path: str | None = sqlmodel.Field(default=None, index=True)
    created: datetime.datetime = sqlmodel.Field(sa_column=sqlalchemy.Column(UTCDateTime))
    status: CheckResultStatus
    message: str
    data: Any = sqlmodel.Field(sa_column=sqlalchemy.Column(sqlalchemy.JSON))

    # Link to the draft revisions that these results are for
    histories: list["CheckResultHistoryLink"] = sqlmodel.Relationship(back_populates="check_result")


class CheckResultHistoryLink(sqlmodel.SQLModel, table=True):
    # Composite primary key, automatically handled by SQLModel
    check_result_id: int = sqlmodel.Field(foreign_key="checkresult.id", primary_key=True)
    draft_revision: str = sqlmodel.Field(index=True, primary_key=True)

    check_result: CheckResult = sqlmodel.Relationship(back_populates="histories")


class TextValue(sqlmodel.SQLModel, table=True):
    # Composite primary key, automatically handled by SQLModel
    ns: str = sqlmodel.Field(primary_key=True, index=True)
    key: str = sqlmodel.Field(primary_key=True, index=True)
    value: str = sqlmodel.Field()


@event.listens_for(Release, "before_insert")
def check_release_name(_mapper: sqlalchemy.orm.Mapper, _connection: sqlalchemy.Connection, release: Release) -> None:
    if release.name == "":
        release.name = release_name(release.project.name, release.version)


def project_version(release_name: str) -> tuple[str, str]:
    """Return the project and version for a given release name."""
    try:
        project_name, version_name = release_name.rsplit("-", 1)
        return (project_name, version_name)
    except ValueError:
        raise ValueError(f"Invalid release name: {release_name}")


def release_name(project_name: str, version_name: str) -> str:
    """Return the release name for a given project and version."""
    return f"{project_name}-{version_name}"
