utils/glam_subdags/histograms.py (51 lines of code) (raw):
from airflow.models import DAG
from utils.gcp import bigquery_etl_query
GLAM_HISTOGRAM_AGGREGATES_FINAL_SUBDAG = "clients_histogram_aggregates"
def histogram_aggregates_subdag(
parent_dag_name,
child_dag_name,
default_args,
schedule_interval,
dataset_id,
fully_qualified_dataset,
billing_project_id,
table_project_id="moz-fx-data-shared-prod",
is_dev=False,
docker_image="gcr.io/moz-fx-data-airflow-prod-88e0/bigquery-etl:latest",
):
GLAM_HISTOGRAM_AGGREGATES_SUBDAG = f"{parent_dag_name}.{child_dag_name}"
default_args["depends_on_past"] = True
dag = DAG(
GLAM_HISTOGRAM_AGGREGATES_SUBDAG,
default_args=default_args,
schedule_interval=schedule_interval,
)
clients_histogram_aggregates_new = bigquery_etl_query(
reattach_on_restart=True,
task_id="clients_histogram_aggregates_new",
destination_table="clients_histogram_aggregates_new_v1",
dataset_id=fully_qualified_dataset,
sql_file_path=f"sql/{table_project_id}/{dataset_id}/clients_histogram_aggregates_new_v1/query.sql",
project_id=billing_project_id,
date_partition_parameter=None,
parameters=("submission_date:DATE:{{ds}}",),
arguments=("--replace",),
dag=dag,
docker_image=docker_image,
)
clients_histogram_aggregates_final = bigquery_etl_query(
reattach_on_restart=True,
task_id="clients_histogram_aggregates_v2",
destination_table="clients_histogram_aggregates_v2",
dataset_id=fully_qualified_dataset,
sql_file_path=f"sql/{table_project_id}/{dataset_id}/clients_histogram_aggregates_v2/query.sql",
project_id=billing_project_id,
depends_on_past=True,
parameters=("submission_date:DATE:{{ds}}",),
date_partition_parameter=None,
arguments=("--replace",),
dag=dag,
docker_image=docker_image,
)
clients_histogram_aggregates_new >> clients_histogram_aggregates_final
return dag