import datetime

from airflow import models
from airflow.operators.empty import EmptyOperator
from airflow.sensors.external_task import ExternalTaskMarker
from airflow.utils.task_group import TaskGroup
from kubernetes.client import models as k8s

from operators.gcp_container_operator import GKEPodOperator
from utils.gcp import (
    bigquery_etl_copy_deduplicate,
    bigquery_etl_query,
)
from utils.tags import Tag

DOCS = """\
# Copy-Deduplicate

This DAG is the root of most derived tables. For each live ping table that the
data pipeline populates, we run a "copy_deduplicate" query once per day to
populate the corresponding stable table.

A few immediate downstream tables are also included in this DAG.

## Workflows

In early 2021, manual reruns of `copy_deduplicate` were leading to empty
partitions, but the root cause has been fixed. See
[bug 1690363](https://bugzilla.mozilla.org/show_bug.cgi?id=1690363).

## Changelog

In April 2021, `copy_deduplicate_main_ping` was moved from a 100-slice
configuration to a single-query configuration, which will change the
performance profile and is intended to be more efficient and slightly
faster. We also increased the number of parallel queries in
`copy_deduplicate_all` to help it finish more quickly and split out
`copy_deduplicate_event_ping` to its own task.
See [telemetry-airflow#1279](
https://github.com/mozilla/telemetry-airflow/pull/1279/files)
"""

default_args = {
    "owner": "akomarzewski@mozilla.com",
    "start_date": datetime.datetime(2019, 7, 25),
    "email": [
        "telemetry-alerts@mozilla.com",
        "dataops+alerts@mozilla.com",
        "akomarzewski@mozilla.com",
    ],
    "email_on_failure": True,
    "email_on_retry": True,
    "depends_on_past": False,
    # If a task fails, retry it twice after waiting at least 5 minutes
    "retries": 2,
    "retry_delay": datetime.timedelta(minutes=5),
}

dag_name = "copy_deduplicate"
tags = [Tag.ImpactTier.tier_1]

with models.DAG(
    dag_name,
    schedule_interval="0 1 * * *",
    doc_md=DOCS,
    default_args=default_args,
    tags=tags,
) as dag:
    # This single task is responsible for sequentially running copy queries
    # over all the tables in _live datasets into _stable datasets except those
    # that are specifically used in another DAG.
    resources = k8s.V1ResourceRequirements(
        requests={"memory": "400Mi"},
    )

    copy_deduplicate_all_base = bigquery_etl_copy_deduplicate(
        task_id="copy_deduplicate_all_base",
        target_project_id="moz-fx-data-shared-prod",
        billing_projects=("moz-fx-data-shared-prod",),
        priority_weight=100,
        parallelism=10,
        # Any table listed here under except_tables _must_ have a corresponding
        # copy_deduplicate job elsewhere.
        except_tables=[
            "telemetry_live.main_v4",
            "telemetry_live.main_use_counter_v4",
            "telemetry_live.main_v5",
            "telemetry_live.event_v4",
            "telemetry_live.first_shutdown_v4",
            "telemetry_live.first_shutdown_use_counter_v4",
            "telemetry_live.first_shutdown_v5",
            "telemetry_live.saved_session_v4",
            "telemetry_live.saved_session_use_counter_v4",
            "telemetry_live.saved_session_v5",
            "firefox_desktop_live.metrics_v1",
        ],
        container_resources=resources,
    )

    copy_deduplicate_sliced = bigquery_etl_copy_deduplicate(
        task_id="copy_deduplicate_sliced",
        target_project_id="moz-fx-data-shared-prod",
        billing_projects=("moz-fx-data-shared-prod",),
        priority_weight=100,
        parallelism=5,
        hourly=True,
        only_tables=[
            "firefox_desktop_live.metrics_v1",
        ],
        container_resources=resources,
    )

    # EmptyOperator is used instead of a task group to maintain compatibility with downstream sensors
    copy_deduplicate_all = EmptyOperator(
        task_id="copy_deduplicate_all",
    )

    (
        copy_deduplicate_sliced,
        copy_deduplicate_all_base,
    ) >> copy_deduplicate_all

    with TaskGroup("copy_deduplicate_all_external") as copy_deduplicate_all_external:
        # list of downstream dependencies consisting of external DAG name and execution delta
        downstream_dependencies = {
            ("bhr_collection", "hour=5, minute=0"),
            ("glam_fenix", "hour=2, minute=0"),
            ("glam_fog", "hour=2, minute=0"),
            ("bqetl_activity_stream", "hour=2, minute=0"),
            ("bqetl_amo_stats", "hour=3, minute=0"),
            ("bqetl_core", "hour=2, minute=0"),
            ("bqetl_ctxsvc_derived", "hour=3, minute=0"),
            ("bqetl_desktop_funnel", "hour=4, minute=0"),
            ("bqetl_event_rollup", "hour=3, minute=0"),
            ("bqetl_experiments_daily", "hour=3, minute=0"),
            ("bqetl_feature_usage", "hour=5, minute=0"),
            ("bqetl_fenix_event_rollup", "hour=2, minute=0"),
            ("bqetl_firefox_ios", "hour=4, minute=0"),
            ("bqetl_fog_decision_support", "hour=4, minute=0"),
            ("bqetl_internal_tooling", "hour=4, minute=0"),
            ("bqetl_internet_outages", "hour=7, minute=0"),
            ("bqetl_messaging_system", "hour=2, minute=0"),
            ("bqetl_main_summary", "hour=2, minute=0"),
            ("bqetl_messaging_system", "hour=2, minute=0"),
            ("bqetl_mobile_activation", "hour=0, minute=0"),
            ("bqetl_mobile_search", "hour=2, minute=0"),
            ("bqetl_monitoring", "hour=2, minute=0"),
            ("bqetl_newtab", "hour=0, minute=0"),
            ("bqetl_org_mozilla_fenix_derived", "hour=2, minute=0"),
            ("bqetl_org_mozilla_firefox_derived", "hour=2, minute=0"),
            ("bqetl_org_mozilla_focus_derived", "hour=2, minute=0"),
            ("bqetl_public_data_json", "hour=5, minute=0"),
            ("bqetl_regrets_reporter_summary", "hour=4, minute=0"),
            ("bqetl_search_terms_daily", "hour=3, minute=0"),
            ("bqetl_sponsored_tiles_clients_daily", "hour=4, minute=0"),
            ("bqetl_urlbar", "hour=3, minute=0"),
        }

        for downstream_dependency in downstream_dependencies:
            ExternalTaskMarker(
                task_id=f"{downstream_dependency[0]}__wait_for_copy_deduplicate_all",
                external_dag_id=downstream_dependency[0],
                external_task_id="wait_for_copy_deduplicate_all",
                execution_date="{{ execution_date.replace("
                + downstream_dependency[1]
                + ").isoformat() }}",
            )

        copy_deduplicate_all >> copy_deduplicate_all_external

    # We split out main ping since it's the highest volume and has a distinct
    # set of downstream dependencies.
    copy_deduplicate_main_ping = bigquery_etl_copy_deduplicate(
        task_id="copy_deduplicate_main_ping",
        target_project_id="moz-fx-data-shared-prod",
        billing_projects=("moz-fx-data-shared-prod",),
        only_tables=[
            "telemetry_live.main_v4",
            "telemetry_live.main_use_counter_v4",
            "telemetry_live.main_v5",
        ],
        priority_weight=100,
        parallelism=5,
        slices=20,
        owner="akomarzewski@mozilla.com",
        email=[
            "telemetry-alerts@mozilla.com",
            "akomarzewski@mozilla.com",
        ],
    )

    with TaskGroup("main_ping_external") as main_ping_external:
        downstream_dependencies = {
            ("graphics_telemetry", "hour=3, minute=0"),
            ("glam", "hour=2, minute=0"),
            ("bqetl_addons", "hour=4, minute=0"),
            ("bqetl_amo_stats", "hour=3, minute=0"),
            ("bqetl_desktop_platform", "hour=3, minute=0"),
            ("bqetl_devtools", "hour=3, minute=0"),
            ("bqetl_experiments_daily", "hour=3, minute=0"),
            ("bqetl_fog_decision_support", "hour=4, minute=0"),
            ("bqetl_internet_outages", "hour=7, minute=0"),
            ("bqetl_main_summary", "hour=2, minute=0"),
            ("bqetl_monitoring", "hour=2, minute=0"),
            ("bqetl_public_data_json", "hour=5, minute=0"),
            ("bqetl_sponsored_tiles_clients_daily", "hour=4, minute=0"),
            ("bqetl_ssl_ratios", "hour=2, minute=0"),
        }

        for downstream_dependency in downstream_dependencies:
            ExternalTaskMarker(
                task_id=f"{downstream_dependency[0]}__wait_for_copy_deduplicate_main_ping",
                external_dag_id=downstream_dependency[0],
                external_task_id="wait_for_copy_deduplicate_main_ping",
                execution_date="{{ execution_date.replace("
                + downstream_dependency[1]
                + ").isoformat() }}",
            )

        copy_deduplicate_main_ping >> main_ping_external

    # We also separate out variant pings that share the main ping schema since these
    # ultrawide tables can sometimes have unique performance problems.
    copy_deduplicate_first_shutdown_ping = bigquery_etl_copy_deduplicate(
        task_id="copy_deduplicate_first_shutdown_ping",
        target_project_id="moz-fx-data-shared-prod",
        billing_projects=("moz-fx-data-shared-prod",),
        only_tables=[
            "telemetry_live.first_shutdown_v4",
            "telemetry_live.first_shutdown_use_counter_v4",
            "telemetry_live.first_shutdown_v5",
        ],
        priority_weight=50,
        parallelism=1,
        owner="akomarzewski@mozilla.com",
    )

    with TaskGroup("first_shutdown_ping_external") as first_shutdown_ping_external:
        downstream_dependencies = {
            ("bqetl_analytics_tables", "hour=2, minute=0"),
        }

        for downstream_dependency in downstream_dependencies:
            ExternalTaskMarker(
                task_id=f"{downstream_dependency[0]}__wait_for_copy_deduplicate_first_shutdown_ping",
                external_dag_id=downstream_dependency[0],
                external_task_id="wait_for_copy_deduplicate_first_shutdown_ping",
                execution_date="{{ execution_date.replace("
                + downstream_dependency[1]
                + ").isoformat() }}",
            )

        copy_deduplicate_first_shutdown_ping >> first_shutdown_ping_external

    copy_deduplicate_saved_session_ping = bigquery_etl_copy_deduplicate(
        task_id="copy_deduplicate_saved_session_ping",
        target_project_id="moz-fx-data-shared-prod",
        billing_projects=("moz-fx-data-shared-prod",),
        only_tables=[
            "telemetry_live.saved_session_v4",
            "telemetry_live.saved_session_use_counter_v4",
            "telemetry_live.saved_session_v5",
        ],
        priority_weight=50,
        parallelism=1,
        owner="akomarzewski@mozilla.com",
    )

    # Events.

    copy_deduplicate_event_ping = bigquery_etl_copy_deduplicate(
        task_id="copy_deduplicate_event_ping",
        target_project_id="moz-fx-data-shared-prod",
        billing_projects=("moz-fx-data-shared-prod",),
        only_tables=["telemetry_live.event_v4"],
        priority_weight=100,
        parallelism=1,
        owner="akomarzewski@mozilla.com",
    )

    event_events = bigquery_etl_query(
        reattach_on_restart=True,
        task_id="event_events",
        project_id="moz-fx-data-shared-prod",
        destination_table="event_events_v1",
        dataset_id="telemetry_derived",
        priority_weight=90,
        owner="akomarzewski@mozilla.com",
        arguments=("--schema_update_option=ALLOW_FIELD_ADDITION",),
    )

    with TaskGroup("event_events_external") as event_events_external:
        downstream_dependencies = {
            ("jetstream", "hour=4, minute=0"),
            ("bqetl_amo_stats", "hour=3, minute=0"),
            ("bqetl_experiments_daily", "hour=3, minute=0"),
            ("bqetl_feature_usage", "hour=5, minute=0"),
            ("bqetl_main_summary", "hour=2, minute=0"),
        }

        for downstream_dependency in downstream_dependencies:
            ExternalTaskMarker(
                task_id=f"{downstream_dependency[0]}__wait_for_event_events",
                external_dag_id=downstream_dependency[0],
                external_task_id="wait_for_event_events",
                execution_date="{{ execution_date.replace("
                + downstream_dependency[1]
                + ").isoformat() }}",
            )

        event_events >> event_events_external

    copy_deduplicate_event_ping >> event_events

    bq_main_events = bigquery_etl_query(
        reattach_on_restart=True,
        task_id="bq_main_events",
        project_id="moz-fx-data-shared-prod",
        destination_table="main_events_v1",
        dataset_id="telemetry_derived",
        priority_weight=90,
        owner="akomarzewski@mozilla.com",
        dag=dag,
        arguments=("--schema_update_option=ALLOW_FIELD_ADDITION",),
    )

    with TaskGroup("bq_main_events_external") as bq_main_events_external:
        downstream_dependencies = {
            ("jetstream", "hour=4, minute=0"),
            ("bqetl_amo_stats", "hour=3, minute=0"),
            ("bqetl_experiments_daily", "hour=3, minute=0"),
            ("bqetl_feature_usage", "hour=5, minute=0"),
            ("bqetl_main_summary", "hour=2, minute=0"),
        }

        for downstream_dependency in downstream_dependencies:
            ExternalTaskMarker(
                task_id=f"{downstream_dependency[0]}__wait_for_bq_main_events",
                external_dag_id=downstream_dependency[0],
                external_task_id="wait_for_bq_main_events",
                execution_date="{{ execution_date.replace("
                + downstream_dependency[1]
                + ").isoformat() }}",
            )

        bq_main_events >> bq_main_events_external

    copy_deduplicate_main_ping >> bq_main_events

    # Daily and last seen views on top of every Glean application.

    # The core clients first seen dataset is a dependency to glean usage
    # queries. Ideally, it would belong inside of a generated bigquery-etl DAG
    # (e.g. bqetl_core), but this would require splitting this DAG into three
    # separate parts threaded by sensors. Since the first_seen_table will end up
    # being part of the clients daily table in this DAG, it will be easier to
    # reason about dependencies in this single DAG while it is being developed.
    telemetry_derived__core_clients_first_seen__v1 = bigquery_etl_query(
        reattach_on_restart=True,
        task_id="telemetry_derived__core_clients_first_seen__v1",
        destination_table="core_clients_first_seen_v1",
        dataset_id="telemetry_derived",
        project_id="moz-fx-data-shared-prod",
        owner="ascholtz@mozilla.com",
        email=["ascholtz@mozilla.com", "telemetry-alerts@mozilla.com"],
        date_partition_parameter="submission_date",
        depends_on_past=True,
        dag=dag,
    )

    with TaskGroup(
        "core_clients_first_seen_external"
    ) as core_clients_first_seen_external:
        ExternalTaskMarker(
            task_id="bqetl_core__wait_for_core_clients_first_seen",
            external_dag_id="bqetl_core",
            external_task_id="wait_for_telemetry_derived__core_clients_first_seen__v1",
            execution_date="{{ execution_date.replace(hour=2, minute=0).isoformat() }}",
        )

        (
            telemetry_derived__core_clients_first_seen__v1
            >> core_clients_first_seen_external
        )

    copy_deduplicate_all >> telemetry_derived__core_clients_first_seen__v1
