dags/dap_collector.py (45 lines of code) (raw):

from datetime import datetime, timedelta from airflow import DAG from operators.gcp_container_operator import GKEPodOperator from utils.tags import Tag DOCS = """ ### DAP Collector #### Description Runs a Docker image that collects data from a DAP (Distributed Aggregation Protocol) leader and stores it in BigQuery. The container is defined in [docker-etl](https://github.com/mozilla/docker-etl/tree/main/jobs/dap-collector) For more information on Privacy Preserving Measurement in Firefox see https://bugzilla.mozilla.org/show_bug.cgi?id=1775035 This DAG requires following variables to be defined in Airflow: * dap_auth_token * dap_hpke_private_key * dap_task_config_url This job is under active development, occasional failures are expected. #### Owner sfriedberger@mozilla.com """ default_args = { "owner": "sfriedberger@mozilla.com", "email": ["akomarzewski@mozilla.com", "sfriedberger@mozilla.com"], "depends_on_past": False, "start_date": datetime(2023, 3, 8), "email_on_failure": True, "email_on_retry": True, "retries": 1, "retry_delay": timedelta(hours=2), } project_id = "moz-fx-data-shared-prod" table_id = "dap_collector_derived.aggregates_v1" tags = [ Tag.ImpactTier.tier_3, Tag.Triage.no_triage, ] with DAG( "dap_collector", default_args=default_args, doc_md=DOCS, schedule_interval="@daily", tags=tags, ) as dag: dap_collector = GKEPodOperator( task_id="dap_collector", arguments=[ "python", "dap_collector/main.py", "--date={{ ds }}", "--auth-token={{ var.value.dap_auth_token }}", "--hpke-private-key={{ var.value.dap_hpke_private_key }}", "--task-config-url={{ var.value.dap_task_config_url }}", "--project", project_id, "--table-id", table_id, ], image="gcr.io/moz-fx-data-airflow-prod-88e0/dap-collector_docker_etl:latest", gcp_conn_id="google_cloud_airflow_gke", )