utils/glam_subdags/general.py (76 lines of code) (raw):

from airflow.models import DAG from utils.gcp import bigquery_etl_query def merge_params(min_param, max_param, additional_params): parameters = ( f"min_sample_id:INT64:{min_param}", f"max_sample_id:INT64:{max_param}", ) if additional_params is not None: parameters += additional_params return parameters def repeated_subdag( parent_dag_name, child_dag_name, default_args, schedule_interval, billing_project_id, table_project_id, dataset_id, fully_qualified_dataset_id, additional_params=None, num_partitions=5, date_partition_parameter="submission_date", docker_image="gcr.io/moz-fx-data-airflow-prod-88e0/bigquery-etl:latest", parallel=False, ): dag = DAG( f"{parent_dag_name}.{child_dag_name}", default_args=default_args, schedule_interval=schedule_interval, ) # This task runs first and replaces the relevant partition, followed # by the next tasks that append to the same partition of the same table. NUM_SAMPLE_IDS = 100 PARTITION_SIZE = NUM_SAMPLE_IDS // num_partitions if NUM_SAMPLE_IDS % num_partitions != 0: raise ValueError( f"Number of partitions must be a divisor " f"of the number of sample ids ({NUM_SAMPLE_IDS})" ) task_0 = bigquery_etl_query( reattach_on_restart=True, task_id=f"{child_dag_name}_0", destination_table=f"{child_dag_name}_v1", dataset_id=fully_qualified_dataset_id, sql_file_path=f"sql/{table_project_id}/{dataset_id}/{child_dag_name}_v1/query.sql", project_id=billing_project_id, depends_on_past=True, parameters=merge_params(0, PARTITION_SIZE - 1, additional_params), date_partition_parameter=date_partition_parameter, arguments=("--replace",), dag=dag, docker_image=docker_image, ) upstream_task = task_0 for partition in range(1, num_partitions): min_param = partition * PARTITION_SIZE max_param = min_param + PARTITION_SIZE - 1 task = bigquery_etl_query( reattach_on_restart=True, task_id=f"{child_dag_name}_{partition}", destination_table=f"{child_dag_name}_v1", dataset_id=fully_qualified_dataset_id, sql_file_path=f"sql/{table_project_id}/{dataset_id}/{child_dag_name}_v1/query.sql", project_id=billing_project_id, depends_on_past=True, parameters=merge_params(min_param, max_param, additional_params), date_partition_parameter=date_partition_parameter, arguments=( "--append_table", "--noreplace", ), dag=dag, docker_image=docker_image, ) upstream_task >> task if not parallel: upstream_task = task return dag