dags/bqetl_backfill_complete.py (81 lines of code) (raw):

"""DAG for completing registered bigquery-etl backfills.""" from datetime import datetime from airflow import DAG from airflow.decorators import task, task_group from airflow.providers.slack.operators.slack import SlackAPIPostOperator from operators.gcp_container_operator import GKEPodOperator from utils.tags import Tag AUTOMATION_SLACK_CHANNEL = "#dataops-alerts" SLACK_CONNECTION_ID = "overwatch_slack" DOCKER_IMAGE = "gcr.io/moz-fx-data-airflow-prod-88e0/bigquery-etl:latest" tags = [Tag.ImpactTier.tier_3] default_args = { "email": [ "ascholtz@mozilla.com", "bewu@mozilla.com", "wichan@mozilla.com", ] } with DAG( "bqetl_backfill_complete", doc_md=__doc__, tags=tags, schedule_interval="@hourly", start_date=datetime(2024, 1, 1), catchup=False, default_args=default_args, ) as dag: detect_backfills = GKEPodOperator( task_id="detect_backfills", name="detect_backfills", cmds=["sh", "-cx"], arguments=[ "script/bqetl backfill scheduled --status=Complete --json_path=/airflow/xcom/return.json --ignore-old-entries", ], image=DOCKER_IMAGE, do_xcom_push=True, ) @task_group def complete_backfill(backfill): @task def prepare_slack_complete_message(entry): watcher_text = " ".join( f"<@{watcher.split('@')[0]}>" for watcher in entry["watchers"] ) return ( f"{watcher_text} :hourglass_flowing_sand: Completing backfill of `{entry['qualified_table_name']}` has started - currently swapping backfill data into production. " f"A snapshot of the current production data will be kept as a backup for 30 days. " f"You will receive another notification once the completing step is done." ) notify_initiate = SlackAPIPostOperator( task_id="slack_notify_initate", username="Backfill", slack_conn_id=SLACK_CONNECTION_ID, text=prepare_slack_complete_message(backfill), channel=AUTOMATION_SLACK_CHANNEL, ) @task def prepare_pod_parameters(entry): return [f"script/bqetl backfill complete { entry['qualified_table_name'] }"] process_backfill = GKEPodOperator( task_id="process_backfill", name="process_backfill", cmds=["sh", "-cx"], arguments=prepare_pod_parameters(backfill), image=DOCKER_IMAGE, reattach_on_restart=True, ) @task def prepare_slack_processing_complete_parameters(entry): watcher_text = " ".join( f"<@{watcher.split('@')[0]}>" for watcher in entry["watchers"] ) return f"{watcher_text} :white_check_mark: Backfill is complete for `{entry['qualified_table_name']}`. Production data has been updated." notify_processing_complete = SlackAPIPostOperator( task_id="slack_notify_processing_complete", username="Backfill", slack_conn_id=SLACK_CONNECTION_ID, text=prepare_slack_processing_complete_parameters(backfill), channel=AUTOMATION_SLACK_CHANNEL, ) notify_initiate >> process_backfill >> notify_processing_complete backfill_groups = complete_backfill.expand(backfill=detect_backfills.output)