"""Plugin for alternative timetables that cannot be trivially defined via cron expressions."""

from datetime import timedelta
from typing import Any

from airflow.plugins_manager import AirflowPlugin
from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable
from pendulum import UTC, DateTime, Time


class MultiWeekTimetable(Timetable):
    def __init__(self, *, num_weeks: int, time: Time = Time.min):
        self.num_weeks = num_weeks
        self.interval_delta = timedelta(days=7 * num_weeks)
        # only enforced for automated data intervals
        self.time = time

    def infer_manual_data_interval(self, run_after: DateTime) -> DataInterval:
        return DataInterval(start=run_after - self.interval_delta, end=run_after)

    def next_dagrun_info(
        self,
        *,
        last_automated_data_interval: DataInterval | None,
        restriction: TimeRestriction,
    ) -> DagRunInfo | None:
        if restriction.earliest is None:  # No start_date specified. Don't schedule.
            return None

        # Find the first run on the regular schedule.
        next_end = (
            DateTime.combine(restriction.earliest, self.time).replace(tzinfo=UTC)
            + self.interval_delta
        )

        max_end = next_end
        if last_automated_data_interval is not None:
            # There was a previous run on the regular schedule.
            # Return the next interval after last_automated_data_interval.end that is
            # aligned with restriction.earliest and self.time
            max_end = last_automated_data_interval.end + self.interval_delta
        elif not restriction.catchup:
            # This is the first ever run on the regular schedule, and catchup is not
            # enabled. Return the last complete interval before now.
            max_end = DateTime.utcnow()
        if next_end < max_end:
            # Return the last complete interval on or before max_end. Use integer
            # division on the number of whole days rather than deal with any corner
            # cases related to leap seconds and partial days.
            skip_intervals = (max_end - next_end).days // self.interval_delta.days
            next_end = next_end + (self.interval_delta * skip_intervals)

        if restriction.latest is not None and next_end > restriction.latest:
            return None  # Over the DAG's scheduled end; don't schedule.
        return DagRunInfo.interval(start=next_end - self.interval_delta, end=next_end)

    def serialize(self) -> dict[str, Any]:
        return {"num_weeks": self.num_weeks, "time": self.time.isoformat()}

    @classmethod
    def deserialize(cls, value: dict[str, Any]) -> Timetable:
        return cls(num_weeks=value["num_weeks"], time=Time.fromisoformat(value["time"]))


class MozillaTimetablePlugin(AirflowPlugin):
    name = "mozilla_timetable_plugin"
    timetables = (MultiWeekTimetable,)
