in dags/bqetl_backfill_complete.py [0:0]
def complete_backfill(backfill):
@task
def prepare_slack_complete_message(entry):
watcher_text = " ".join(
f"<@{watcher.split('@')[0]}>" for watcher in entry["watchers"]
)
return (
f"{watcher_text} :hourglass_flowing_sand: Completing backfill of `{entry['qualified_table_name']}` has started - currently swapping backfill data into production. "
f"A snapshot of the current production data will be kept as a backup for 30 days. "
f"You will receive another notification once the completing step is done."
)
notify_initiate = SlackAPIPostOperator(
task_id="slack_notify_initate",
username="Backfill",
slack_conn_id=SLACK_CONNECTION_ID,
text=prepare_slack_complete_message(backfill),
channel=AUTOMATION_SLACK_CHANNEL,
)
@task
def prepare_pod_parameters(entry):
return [f"script/bqetl backfill complete { entry['qualified_table_name'] }"]
process_backfill = GKEPodOperator(
task_id="process_backfill",
name="process_backfill",
cmds=["sh", "-cx"],
arguments=prepare_pod_parameters(backfill),
image=DOCKER_IMAGE,
reattach_on_restart=True,
)
@task
def prepare_slack_processing_complete_parameters(entry):
watcher_text = " ".join(
f"<@{watcher.split('@')[0]}>" for watcher in entry["watchers"]
)
return f"{watcher_text} :white_check_mark: Backfill is complete for `{entry['qualified_table_name']}`. Production data has been updated."
notify_processing_complete = SlackAPIPostOperator(
task_id="slack_notify_processing_complete",
username="Backfill",
slack_conn_id=SLACK_CONNECTION_ID,
text=prepare_slack_processing_complete_parameters(backfill),
channel=AUTOMATION_SLACK_CHANNEL,
)
notify_initiate >> process_backfill >> notify_processing_complete