dags/bhr_collection.py (116 lines of code) (raw):
"""
A processing job on top of BHR (Background Hang Reporter) pings.
More information about the pings: https://firefox-source-docs.mozilla.org/toolkit/components/telemetry/data/backgroundhangmonitor-ping.html
BHR is related to the Background Hang Monitor in Firefox Desktop.
See: [bug 1675103](https://bugzilla.mozilla.org/show_bug.cgi?id=1675103)
The [job source](https://github.com/mozilla/python_mozetl/blob/main/mozetl/bhr_collection)
is maintained in the mozetl repository.
* Migrated from Databricks and now running as a scheduled Dataproc task. *
The resulting aggregations are used by the following service:
https://fqueze.github.io/hang-stats/#date=[DATE]&row=0
"""
import datetime
from airflow import DAG
from airflow.operators.subdag import SubDagOperator
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
from airflow.sensors.external_task import ExternalTaskSensor
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.dataproc import get_dataproc_parameters, moz_dataproc_pyspark_runner
from utils.tags import Tag
default_args = {
"owner": "bewu@mozilla.com",
"depends_on_past": False,
"start_date": datetime.datetime(2020, 11, 26),
"email": [
"telemetry-alerts@mozilla.com",
"kik@mozilla.com",
"dothayer@mozilla.com",
"bewu@mozilla.com",
],
"email_on_failure": True,
"email_on_retry": True,
"retries": 1,
"retry_delay": datetime.timedelta(minutes=30),
}
tags = [Tag.ImpactTier.tier_1]
with DAG(
"bhr_collection",
default_args=default_args,
schedule_interval="0 5 * * *",
doc_md=__doc__,
tags=tags,
) as dag:
wait_for_bhr_ping = ExternalTaskSensor(
task_id="wait_for_copy_deduplicate",
external_dag_id="copy_deduplicate",
external_task_id="copy_deduplicate_all",
execution_delta=datetime.timedelta(hours=4),
check_existence=True,
mode="reschedule",
allowed_states=ALLOWED_STATES,
failed_states=FAILED_STATES,
pool="DATA_ENG_EXTERNALTASKSENSOR",
email_on_retry=False,
dag=dag,
)
params = get_dataproc_parameters("google_cloud_airflow_dataproc")
shared_runner_args = {
"parent_dag_name": dag.dag_id,
"image_version": "1.5-debian10",
"default_args": default_args,
"python_driver_code": "https://raw.githubusercontent.com/mozilla/python_mozetl/main/mozetl/bhr_collection/bhr_collection.py",
"init_actions_uris": [
"gs://dataproc-initialization-actions/python/pip-install.sh"
],
"additional_metadata": {
"PIP_PACKAGES": "boto3==1.16.20 click==7.1.2 google-cloud-storage==2.7.0"
},
"additional_properties": {
"spark:spark.jars": "gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar",
"spark:spark.driver.memory": "30g",
"spark:spark.executor.memory": "20g",
},
"idle_delete_ttl": 14400,
# supported machine types depends on dataproc image version:
# https://cloud.google.com/dataproc/docs/concepts/compute/supported-machine-types
"master_machine_type": "n2-highmem-8",
"worker_machine_type": "n2-highmem-4",
"gcp_conn_id": params.conn_id,
"service_account": params.client_email,
"storage_bucket": params.storage_bucket,
}
bhr_collection = SubDagOperator(
task_id="bhr_collection",
dag=dag,
subdag=moz_dataproc_pyspark_runner(
dag_name="bhr_collection",
cluster_name="bhr-collection-main-{{ ds }}",
job_name="bhr-collection-main",
**shared_runner_args,
num_workers=6,
py_args=[
"--date",
"{{ ds }}",
"--sample-size",
"0.5",
"--use_gcs",
"--thread-filter",
"Gecko",
"--output-tag",
"main",
],
),
)
bhr_collection_child = SubDagOperator(
task_id="bhr_collection_child",
dag=dag,
subdag=moz_dataproc_pyspark_runner(
dag_name="bhr_collection_child",
cluster_name="bhr-collection-child-{{ ds }}",
job_name="bhr-collection-child",
**shared_runner_args,
num_workers=12,
py_args=[
"--date",
"{{ ds }}",
"--sample-size",
"0.08", # there are usually 12-15x more hangs in the child process than main
"--use_gcs",
"--thread-filter",
"Gecko_Child",
"--output-tag",
"child",
],
),
)
wait_for_bhr_ping >> [
bhr_collection,
bhr_collection_child,
]