dags/adm_export.py (69 lines of code) (raw):

import datetime from airflow import DAG from airflow.hooks.base import BaseHook from airflow.providers.cncf.kubernetes.secret import Secret from airflow.sensors.external_task import ExternalTaskSensor from operators.gcp_container_operator import GKEPodOperator from utils.constants import ALLOWED_STATES, FAILED_STATES from utils.tags import Tag DOCS = """\ Daily data exports of contextual services data aggregates to adMarketplace. This is a complementary approach to the near real-time sharing that is implemented in gcp-ingestion. Relies on the [`bq2stfp` container defined in `docker-etl`](https://github.com/mozilla/docker-etl/tree/main/jobs/bq2sftp) and credentials stored in the `adm_sftp` connection. For more context, see https://bugzilla.mozilla.org/show_bug.cgi?id=1729524 """ default_args = { "owner": "wstuckey@mozilla.com", "start_date": datetime.datetime(2019, 7, 25), "email": ["telemetry-alerts@mozilla.com", "wstuckey@mozilla.com"], "email_on_failure": True, "email_on_retry": True, "depends_on_past": False, # If a task fails, retry it once after waiting at least 5 minutes "retries": 1, "retry_delay": datetime.timedelta(minutes=5), } dag_name = "adm_export" tags = [Tag.ImpactTier.tier_3] adm_sftp_secret = Secret( deploy_type="env", deploy_target="SFTP_PASSWORD", secret="airflow-gke-secrets", key="adm_export_secret__sftp_password", ) with DAG( dag_name, schedule_interval="0 5 * * *", doc_md=DOCS, default_args=default_args, tags=tags, ) as dag: conn = BaseHook.get_connection("adm_sftp") adm_daily_aggregates_to_sftp = GKEPodOperator( task_id="adm_daily_aggregates_to_sftp", name="adm_daily_aggregates_to_sftp", # See https://github.com/mozilla/docker-etl/pull/28 image="gcr.io/moz-fx-data-airflow-prod-88e0/bq2sftp_docker_etl:latest", project_id="moz-fx-data-airflow-gke-prod", gcp_conn_id="google_cloud_airflow_gke", cluster_name="workloads-prod-v1", location="us-west1", env_vars={ "SFTP_USERNAME": conn.login, "SFTP_HOST": conn.host, "SFTP_PORT": str(conn.port), "KNOWN_HOSTS": conn.extra_dejson["known_hosts"], "SRC_TABLE": "moz-fx-data-shared-prod.search_terms_derived.adm_daily_aggregates_v1", # The run for submission_date=2022-03-04 will be named: # Aggregated-Query-Data-03042022.csv.gz "DST_PATH": 'files/Aggregated-Query-Data-{{ macros.ds_format(ds, "%Y-%m-%d", "%m%d%Y") }}.csv.gz', "SUBMISSION_DATE": "{{ ds }}", }, secrets=[adm_sftp_secret], email=[ "telemetry-alerts@mozilla.com", ], ) wait_for_clients_daily_export = ExternalTaskSensor( task_id="wait_for_adm_daily_aggregates", external_dag_id="bqetl_search_terms_daily", external_task_id="search_terms_derived__adm_daily_aggregates__v1", execution_delta=datetime.timedelta(hours=2), mode="reschedule", allowed_states=ALLOWED_STATES, failed_states=FAILED_STATES, pool="DATA_ENG_EXTERNALTASKSENSOR", email_on_retry=False, ) wait_for_clients_daily_export >> adm_daily_aggregates_to_sftp