dags/bqetl_artifact_deployment.py (163 lines of code) (raw):
"""
Deploy bigquery etl artifacts.
This DAG is triggered by CircleCI on merges to the `main` branch and by Jenkins after [schemas deploys](https://mozilla-hub.atlassian.net/wiki/spaces/SRE/pages/27920974/BigQuery+shared-prod#BigQuery(shared-prod)-JenkinsJobs).
SQL generation can optionally run during the tasks using the `generate_sql` DAG param, which is used primarily by Jenkins.
*Triage notes*
The DAG always re-deploys all bqetl views. So as long as the most recent DAG run
is successful the job can be considered healthy. This means previous failed DAG runs
can be ignored.
`publish_views` doesn't show any logs due to spitting out invalid unicode.
Logs can be viewed in the [GCP logs explorer](https://console.cloud.google.com/logs?project=moz-fx-data-airflow-gke-prod)
with the following query:
```
resource.type="k8s_container"
resource.labels.project_id="moz-fx-data-airflow-gke-prod"
resource.labels.location="us-west1"
resource.labels.cluster_name="workloads-prod-v1"
resource.labels.namespace_name="default"
resource.labels.pod_name=~"publish-views-.+"
severity>=DEFAULT
```
This link leads to a
prepopulated query for the last 12 hours: https://cloudlogging.app.goo.gl/vTs1R7fCMQnLxFtV8
To view logs of a specific task run, replace the pod_name value with the pod name that can be
found at the start of the logs in Airflow, e.g. `INFO - Found matching pod publish-views-abcd1234`.
To find logs for specific datasets/views, add a string to search for to the end of the query.
"""
from datetime import datetime, timedelta
from airflow import DAG
from airflow.models import DagRun, Param
from airflow.operators.python import ShortCircuitOperator
from airflow.providers.cncf.kubernetes.secret import Secret
from airflow.utils.state import DagRunState
from airflow.utils.trigger_rule import TriggerRule
from kubernetes.client import models as k8s
from operators.gcp_container_operator import GKEPodOperator
from utils.tags import Tag
default_args = {
"owner": "ascholtz@mozilla.com",
"email": [
"ascholtz@mozilla.com",
"telemetry-alerts@mozilla.com",
],
"depends_on_past": False,
"start_date": datetime(2022, 12, 6),
"email_on_failure": True,
"email_on_retry": True,
"retries": 2,
"retry_delay": timedelta(minutes=30),
}
tags = [Tag.ImpactTier.tier_1]
params = {
"generate_sql": Param(
default=False,
type="boolean",
description="Run sql generation before each publish task",
),
}
# renders generate sql command if params.generate_sql is true, else empty string
generate_sql_cmd_template = (
"{{ 'script/bqetl generate all --use-cloud-function=false && ' "
"if params.generate_sql else '' }}"
)
# SQL generation currently requires ~4 GB of memory.
generate_sql_container_resources = k8s.V1ResourceRequirements(
requests={"memory": "{{ '5Gi' if params.generate_sql else '2Gi' }}"},
)
def should_run_deployment(dag_id: str, generate_sql: bool) -> bool:
"""
Run deploys if there are no other queued dag runs or if the generate_sql param is true.
When used with ShortCircuitOperator, true means run downstream tasks and false means skip.
"""
queued_runs = DagRun.find(dag_id=dag_id, state=DagRunState.QUEUED)
print(f"Found {len(queued_runs)} queued dag runs for {dag_id}")
return len(queued_runs) == 0 or generate_sql == "True"
bigeye_api_key_secret = Secret(
deploy_type="env",
deploy_target="BIGEYE_API_KEY",
secret="airflow-gke-secrets",
key="bqetl_artifact_deployment__bigeye_api_key",
)
with DAG(
"bqetl_artifact_deployment",
max_active_runs=1,
default_args=default_args,
schedule_interval=None,
doc_md=__doc__,
tags=tags,
params=params,
) as dag:
docker_image = "gcr.io/moz-fx-data-airflow-prod-88e0/bigquery-etl:latest"
skip_if_queued_runs_exist = ShortCircuitOperator(
task_id="skip_if_queued_runs_exist",
ignore_downstream_trigger_rules=True,
python_callable=should_run_deployment,
op_kwargs={"dag_id": dag.dag_id, "generate_sql": "{{ params.generate_sql }}"},
)
publish_public_udfs = GKEPodOperator(
task_id="publish_public_udfs",
arguments=["script/publish_public_udfs"],
image=docker_image,
)
publish_persistent_udfs = GKEPodOperator(
task_id="publish_persistent_udfs",
cmds=["bash", "-x", "-c"],
arguments=[
"script/publish_persistent_udfs --project-id=moz-fx-data-shared-prod && "
"script/publish_persistent_udfs --project-id=mozdata"
],
image=docker_image,
)
publish_new_tables = GKEPodOperator(
task_id="publish_new_tables",
cmds=["bash", "-x", "-c"],
execution_timeout=timedelta(hours=2),
arguments=[
generate_sql_cmd_template
+ "script/bqetl query initialize '*' --skip-existing --project-id=moz-fx-data-shared-prod && "
"script/bqetl query initialize '*' --skip-existing --project-id=moz-fx-data-experiments && "
"script/bqetl query initialize '*' --skip-existing --project-id=moz-fx-data-marketing-prod && "
"script/bqetl query initialize '*' --skip-existing --project-id=moz-fx-data-bq-people && "
"script/bqetl query schema update '*' --use-cloud-function=false --ignore-dryrun-skip --project-id=moz-fx-data-shared-prod && "
"script/bqetl query schema deploy '*' --use-cloud-function=false --force --ignore-dryrun-skip --project-id=moz-fx-data-shared-prod && "
"script/bqetl query schema update '*' --use-cloud-function=false --ignore-dryrun-skip --project-id=moz-fx-data-experiments && "
"script/bqetl query schema deploy '*' --use-cloud-function=false --force --ignore-dryrun-skip --project-id=moz-fx-data-experiments && "
"script/bqetl query schema update '*' --use-cloud-function=false --ignore-dryrun-skip --project-id=moz-fx-data-marketing-prod && "
"script/bqetl query schema deploy '*' --use-cloud-function=false --force --ignore-dryrun-skip --project-id=moz-fx-data-marketing-prod && "
"script/bqetl query schema update '*' --use-cloud-function=false --ignore-dryrun-skip --project-id=moz-fx-glam-prod && "
"script/bqetl query schema deploy '*' --use-cloud-function=false --force --ignore-dryrun-skip --project-id=moz-fx-glam-prod && "
"script/bqetl query schema update '*' --use-cloud-function=false --ignore-dryrun-skip --project-id=moz-fx-data-bq-people && "
"script/bqetl query schema deploy '*' --use-cloud-function=false --force --ignore-dryrun-skip --project-id=moz-fx-data-bq-people"
],
image=docker_image,
container_resources=generate_sql_container_resources,
)
publish_views = GKEPodOperator(
task_id="publish_views",
cmds=["bash", "-x", "-c"],
execution_timeout=timedelta(hours=2),
arguments=[
generate_sql_cmd_template
+ "script/bqetl view publish --add-managed-label --skip-authorized --project-id=moz-fx-data-shared-prod && "
"script/bqetl view publish --add-managed-label --skip-authorized --project-id=moz-fx-data-experiments && "
"script/bqetl view publish --add-managed-label --skip-authorized --project-id=moz-fx-data-marketing-prod && "
"script/bqetl view publish --add-managed-label --skip-authorized --project-id=moz-fx-glam-prod && "
"script/bqetl view publish --add-managed-label --skip-authorized --project-id=moz-fx-data-shared-prod --target-project=mozdata --user-facing-only && "
"script/bqetl view publish --add-managed-label --skip-authorized --project-id=moz-fx-data-bq-people && "
"script/bqetl view clean --skip-authorized --target-project=moz-fx-data-shared-prod && "
"script/bqetl view clean --skip-authorized --target-project=moz-fx-data-experiments --project-id=moz-fx-data-experiments && "
"script/bqetl view clean --skip-authorized --target-project=moz-fx-data-marketing-prod --project-id=moz-fx-data-marketing-prod && "
"script/bqetl view clean --skip-authorized --target-project=moz-fx-glam-prod --project-id=moz-fx-glam-prod && "
"script/bqetl view clean --skip-authorized --target-project=mozdata --user-facing-only && "
"script/bqetl view clean --skip-authorized --target-project=moz-fx-data-bq-people --project-id=moz-fx-data-bq-people && "
"script/publish_public_data_views --target-project=moz-fx-data-shared-prod && "
"script/publish_public_data_views --target-project=mozdata"
],
image=docker_image,
container_resources=generate_sql_container_resources,
get_logs=False,
trigger_rule=TriggerRule.ALL_DONE,
)
publish_metadata = GKEPodOperator(
task_id="publish_metadata",
cmds=["bash", "-x", "-c"],
arguments=[
generate_sql_cmd_template
+ "script/bqetl metadata publish '*' --project_id=moz-fx-data-shared-prod && "
"script/bqetl metadata publish '*' --project_id=mozdata && "
"script/bqetl metadata publish '*' --project_id=moz-fx-data-marketing-prod && "
"script/bqetl metadata publish '*' --project_id=moz-fx-data-experiments && "
"script/bqetl metadata publish '*' --project_id=moz-fx-data-bq-people"
],
image=docker_image,
container_resources=generate_sql_container_resources,
)
publish_bigeye_monitors = GKEPodOperator(
task_id="publish_bigeye_monitors",
cmds=["bash", "-x", "-c"],
arguments=[
"script/bqetl monitoring deploy '*' --project_id=moz-fx-data-shared-prod"
],
image=docker_image,
secrets=[bigeye_api_key_secret],
)
skip_if_queued_runs_exist.set_downstream(
[
publish_public_udfs,
publish_persistent_udfs,
publish_new_tables,
]
)
publish_views.set_upstream(publish_public_udfs)
publish_views.set_upstream(publish_persistent_udfs)
publish_views.set_upstream(publish_new_tables)
publish_metadata.set_upstream(publish_views)
publish_bigeye_monitors.set_upstream(publish_views)