dags/app_store_analytics.py (89 lines of code) (raw):

from datetime import datetime, timedelta from airflow import DAG from operators.gcp_container_operator import GKEPodOperator from utils.gcp import bigquery_etl_query from utils.tags import Tag default_args = { "owner": "telemetry-alerts@mozilla.com", "depends_on_past": False, "start_date": datetime(2020, 6, 23), "email_on_failure": True, "email_on_retry": True, "retries": 1, "retry_delay": timedelta(minutes=30), "email": [ "telemetry-alerts@mozilla.com", ], } PROJECT_ID = "moz-fx-data-marketing-prod" EXPORT_DATASET_ID = "apple_app_store_exported" DERIVED_DATASET_ID = "apple_app_store" APPS = [ ("989804926", "Firefox"), ("1489407738", "VPN"), ("1295998056", "WebXRViewer"), ("1314000270", "Lockwise"), ("1073435754", "Klar"), ("1055677337", "Focus"), ] DERIVED_TABLES = [ "metrics_by_app_referrer", "metrics_by_app_version", "metrics_by_campaign", "metrics_by_platform", "metrics_by_platform_version", "metrics_by_region", "metrics_by_source", "metrics_by_storefront", "metrics_by_web_referrer", "metrics_total", ] tags = [Tag.ImpactTier.tier_1] with DAG( "app_store_analytics", default_args=default_args, max_active_runs=1, schedule_interval="@daily", tags=tags, ) as dag: export_date = "macros.ds_add(ds, -2)" # previous day data is incomplete tasks = [] # App exports are scheduled sequentially to avoid hit api rate limit for i, (app_id, app_name) in enumerate(APPS): commands = [ "yarn", "--silent", # silent to hide arguments from logs "export", "--username={{ var.value.app_store_connect_username }}", "--password={{ var.value.app_store_connect_password }}", f"--app-id={app_id}", f"--app-name={app_name}", f"--start-date={{{{ {export_date} }}}}", f"--project={PROJECT_ID}", f"--dataset={EXPORT_DATASET_ID}", ] # First task will clear the day partition so that the only data in the table partition # is the data written by the current dag run and does not include unrecognized apps if i == 0: commands.append("--overwrite") app_store_analytics = GKEPodOperator( task_id=f"app_store_analytics_{app_name}", arguments=commands, image="gcr.io/moz-fx-data-airflow-prod-88e0/app-store-analytics-export:latest", gcp_conn_id="google_cloud_airflow_gke", dag=dag, ) if i > 0: app_store_analytics.set_upstream(tasks[i - 1]) tasks.append(app_store_analytics) # derived tables combine all metrics per dimension for derived_table in DERIVED_TABLES: combined_metrics_query = bigquery_etl_query( task_id=f"{derived_table}_query", project_id=PROJECT_ID, dataset_id=DERIVED_DATASET_ID, sql_file_path=f"sql/moz-fx-data-marketing-prod/{DERIVED_DATASET_ID}/{derived_table}/query.sql", # Override default date partition because data has multiple day lag destination_table=( f"{derived_table}${{{{ macros.ds_format({export_date}, '%Y-%m-%d', '%Y%m%d') }}}}" ), date_partition_parameter=None, parameters=[f"submission_date:DATE:{{{{ {export_date} }}}}"], dag=dag, ) combined_metrics_query.set_upstream(tasks[-1])