utils/glam_subdags/histograms.py (51 lines of code) (raw):

from airflow.models import DAG from utils.gcp import bigquery_etl_query GLAM_HISTOGRAM_AGGREGATES_FINAL_SUBDAG = "clients_histogram_aggregates" def histogram_aggregates_subdag( parent_dag_name, child_dag_name, default_args, schedule_interval, dataset_id, fully_qualified_dataset, billing_project_id, table_project_id="moz-fx-data-shared-prod", is_dev=False, docker_image="gcr.io/moz-fx-data-airflow-prod-88e0/bigquery-etl:latest", ): GLAM_HISTOGRAM_AGGREGATES_SUBDAG = f"{parent_dag_name}.{child_dag_name}" default_args["depends_on_past"] = True dag = DAG( GLAM_HISTOGRAM_AGGREGATES_SUBDAG, default_args=default_args, schedule_interval=schedule_interval, ) clients_histogram_aggregates_new = bigquery_etl_query( reattach_on_restart=True, task_id="clients_histogram_aggregates_new", destination_table="clients_histogram_aggregates_new_v1", dataset_id=fully_qualified_dataset, sql_file_path=f"sql/{table_project_id}/{dataset_id}/clients_histogram_aggregates_new_v1/query.sql", project_id=billing_project_id, date_partition_parameter=None, parameters=("submission_date:DATE:{{ds}}",), arguments=("--replace",), dag=dag, docker_image=docker_image, ) clients_histogram_aggregates_final = bigquery_etl_query( reattach_on_restart=True, task_id="clients_histogram_aggregates_v2", destination_table="clients_histogram_aggregates_v2", dataset_id=fully_qualified_dataset, sql_file_path=f"sql/{table_project_id}/{dataset_id}/clients_histogram_aggregates_v2/query.sql", project_id=billing_project_id, depends_on_past=True, parameters=("submission_date:DATE:{{ds}}",), date_partition_parameter=None, arguments=("--replace",), dag=dag, docker_image=docker_image, ) clients_histogram_aggregates_new >> clients_histogram_aggregates_final return dag