import operator
from datetime import datetime, timedelta
from functools import partial, reduce

from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.sensors.external_task import ExternalTaskMarker, ExternalTaskSensor
from airflow.utils.task_group import TaskGroup

from operators.gcp_container_operator import GKEPodOperator
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.glam_subdags.generate_query import (
    generate_and_run_glean_task,
)
from utils.tags import Tag

default_args = {
    "owner": "efilho@mozilla.com",
    "depends_on_past": False,
    "start_date": datetime(2024, 12, 11),
    "email": [
        "telemetry-alerts@mozilla.com",
        "akomarzewski@mozilla.com",
        "efilho@mozilla.com",
    ],
    "email_on_failure": True,
    "email_on_retry": True,
    "retries": 1,
    "retry_delay": timedelta(minutes=30),
}

PROJECT = "moz-fx-glam-prod"

tags = [Tag.ImpactTier.tier_2]

with DAG(
    "glam_fog_release",
    default_args=default_args,
    max_active_runs=1,
    schedule_interval="0 10 * * 6",  # 10am on Saturday
    tags=tags,
) as dag:
    wait_for_glam_fog = ExternalTaskSensor(
        task_id="wait_for_daily_glam_fog_release",
        external_dag_id="glam_fog",
        external_task_id="daily_release_done",
        execution_delta=timedelta(days=-5, hours=-16),
        check_existence=True,
        mode="reschedule",
        allowed_states=ALLOWED_STATES,
        failed_states=FAILED_STATES,
        pool="DATA_ENG_EXTERNALTASKSENSOR",
        email_on_retry=False,
    )

    fog_release_done = EmptyOperator(
        task_id="fog_release_done",
    )

    for product in ["firefox_desktop_glam_release"]:
        func = partial(
            generate_and_run_glean_task,
            product=product,
            destination_project_id=PROJECT,
            env_vars={"STAGE": "incremental"},
        )
        view, init, query = (
            partial(func, task_type=task_type)
            for task_type in ["view", "init", "query"]
        )

        # stage 2 - downstream for export
        scalar_bucket_counts = query(task_name=f"{product}__scalar_bucket_counts_v1")
        scalar_probe_counts = query(task_name=f"{product}__scalar_probe_counts_v1")

        with TaskGroup(
            group_id=f"{product}__histogram_bucket_counts_v1", dag=dag, default_args=default_args
        ) as histogram_bucket_counts:
            prev_task = None
            # Windows + Release data is in [0-9] so we're further splitting that range.
            for sample_range in (
                [0, 0], [1, 1], [2, 2], [3, 3], [4, 4], [5, 5], [6, 6],
                [7, 7], [8, 8], [9, 9], [10, 19], [20, 29], [30, 39],
                [40, 49], [50, 59], [60, 69], [70, 79], [80, 89], [90, 99]
            ):
                histogram_bucket_counts_sampled = query(
                    task_name=(
                        f"{product}__histogram_bucket_counts_v1_sampled_"
                        f"{sample_range[0]}_{sample_range[1]}"
                    ),
                    min_sample_id=sample_range[0],
                    max_sample_id=sample_range[1],
                    replace_table=(sample_range[0] == 0)
                )
                if prev_task:
                    histogram_bucket_counts_sampled.set_upstream(prev_task)
                prev_task = histogram_bucket_counts_sampled

        histogram_probe_counts = query(
            task_name=f"{product}__histogram_probe_counts_v1"
        )

        probe_counts = view(task_name=f"{product}__view_probe_counts_v1")
        extract_probe_counts = query(task_name=f"{product}__extract_probe_counts_v1")

        user_counts = view(task_name=f"{product}__view_user_counts_v1")
        extract_user_counts = query(task_name=f"{product}__extract_user_counts_v1")

        sample_counts = view(task_name=f"{product}__view_sample_counts_v1")

        (
            wait_for_glam_fog
            >> scalar_bucket_counts
            >> scalar_probe_counts
            >> probe_counts
        )
        (
            wait_for_glam_fog
            >> histogram_bucket_counts
            >> histogram_probe_counts
            >> probe_counts
        )
        probe_counts >> sample_counts >> extract_probe_counts >> fog_release_done
        (
            wait_for_glam_fog
            >> user_counts
            >> extract_user_counts
            >> fog_release_done
        )
        wait_for_glam_fog >> fog_release_done
