import datetime

from airflow import DAG
from airflow.hooks.base import BaseHook
from airflow.providers.cncf.kubernetes.secret import Secret
from airflow.sensors.external_task import ExternalTaskSensor

from operators.gcp_container_operator import GKEPodOperator
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.tags import Tag

DOCS = """\
Daily data exports of contextual services data aggregates to adMarketplace.
This is a complementary approach to the near real-time sharing that is implemented
in gcp-ingestion.

Relies on the [`bq2stfp` container defined in `docker-etl`](https://github.com/mozilla/docker-etl/tree/main/jobs/bq2sftp)
and credentials stored in the `adm_sftp` connection.

For more context, see https://bugzilla.mozilla.org/show_bug.cgi?id=1729524
"""

default_args = {
    "owner": "wstuckey@mozilla.com",
    "start_date": datetime.datetime(2019, 7, 25),
    "email": ["telemetry-alerts@mozilla.com", "wstuckey@mozilla.com"],
    "email_on_failure": True,
    "email_on_retry": True,
    "depends_on_past": False,
    # If a task fails, retry it once after waiting at least 5 minutes
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
}

dag_name = "adm_export"
tags = [Tag.ImpactTier.tier_3]

adm_sftp_secret = Secret(
    deploy_type="env",
    deploy_target="SFTP_PASSWORD",
    secret="airflow-gke-secrets",
    key="adm_export_secret__sftp_password",
)

with DAG(
    dag_name,
    schedule_interval="0 5 * * *",
    doc_md=DOCS,
    default_args=default_args,
    tags=tags,
) as dag:
    conn = BaseHook.get_connection("adm_sftp")

    adm_daily_aggregates_to_sftp = GKEPodOperator(
        task_id="adm_daily_aggregates_to_sftp",
        name="adm_daily_aggregates_to_sftp",
        # See https://github.com/mozilla/docker-etl/pull/28
        image="gcr.io/moz-fx-data-airflow-prod-88e0/bq2sftp_docker_etl:latest",
        project_id="moz-fx-data-airflow-gke-prod",
        gcp_conn_id="google_cloud_airflow_gke",
        cluster_name="workloads-prod-v1",
        location="us-west1",
        env_vars={
            "SFTP_USERNAME": conn.login,
            "SFTP_HOST": conn.host,
            "SFTP_PORT": str(conn.port),
            "KNOWN_HOSTS": conn.extra_dejson["known_hosts"],
            "SRC_TABLE": "moz-fx-data-shared-prod.search_terms_derived.adm_daily_aggregates_v1",
            # The run for submission_date=2022-03-04 will be named:
            # Aggregated-Query-Data-03042022.csv.gz
            "DST_PATH": 'files/Aggregated-Query-Data-{{ macros.ds_format(ds, "%Y-%m-%d", "%m%d%Y") }}.csv.gz',
            "SUBMISSION_DATE": "{{ ds }}",
        },
        secrets=[adm_sftp_secret],
        email=[
            "telemetry-alerts@mozilla.com",
        ],
    )

    wait_for_clients_daily_export = ExternalTaskSensor(
        task_id="wait_for_adm_daily_aggregates",
        external_dag_id="bqetl_search_terms_daily",
        external_task_id="search_terms_derived__adm_daily_aggregates__v1",
        execution_delta=datetime.timedelta(hours=2),
        mode="reschedule",
        allowed_states=ALLOWED_STATES,
        failed_states=FAILED_STATES,
        pool="DATA_ENG_EXTERNALTASKSENSOR",
        email_on_retry=False,
    )

    wait_for_clients_daily_export >> adm_daily_aggregates_to_sftp
