dags/ltv.py (120 lines of code) (raw):

""" Client Lifetime Value. Kicks off jobs to run on a Dataproc cluster. The job code lives in [jobs/ltv_daily.py](https://github.com/mozilla/telemetry-airflow/blob/main/jobs/ltv_daily.py). See [client_ltv docs on DTMO](https://docs.telemetry.mozilla.org/datasets/search/client_ltv/reference.html). """ from datetime import datetime, timedelta from airflow import DAG from airflow.operators.subdag import SubDagOperator from airflow.sensors.external_task import ExternalTaskSensor from utils.constants import ALLOWED_STATES, FAILED_STATES from utils.dataproc import ( copy_artifacts_dev, get_dataproc_parameters, moz_dataproc_pyspark_runner, ) from utils.gcp import bigquery_etl_query from utils.tags import Tag default_args = { "owner": "akomar@mozilla.com", "depends_on_past": True, "start_date": datetime(2020, 3, 15), "email": [ "telemetry-alerts@mozilla.com", "akomar@mozilla.com", ], "email_on_failure": True, "email_on_retry": True, "retries": 3, "retry_delay": timedelta(minutes=30), } tags = [Tag.ImpactTier.tier_2] dag = DAG( "ltv_daily", default_args=default_args, schedule_interval="0 4 * * *", doc_md=__doc__, tags=tags, ) params = get_dataproc_parameters("google_cloud_airflow_dataproc") subdag_args = default_args.copy() subdag_args["retries"] = 0 task_id = "ltv_daily" project = params.project_id if params.is_dev else "moz-fx-data-shared-prod" ltv_daily = SubDagOperator( task_id=task_id, dag=dag, subdag=moz_dataproc_pyspark_runner( parent_dag_name=dag.dag_id, dag_name=task_id, job_name="ltv-daily", cluster_name="ltv-daily-{{ ds_nodash }}", idle_delete_ttl=600, num_workers=30, worker_machine_type="n2-standard-16", optional_components=["ANACONDA"], init_actions_uris=[ "gs://dataproc-initialization-actions/python/pip-install.sh" ], additional_properties={ "spark:spark.jars": "gs://spark-lib/bigquery/spark-bigquery-latest.jar" }, additional_metadata={"PIP_PACKAGES": "lifetimes==0.11.1"}, python_driver_code=f"gs://{params.artifact_bucket}/jobs/ltv_daily.py", py_args=[ "--submission-date", "{{ ds }}", "--prediction-days", "364", "--project-id", project, "--source-qualified-table-id", f"{project}.search.search_rfm", "--dataset-id", "analysis", "--intermediate-table-id", "ltv_daily_temporary_search_rfm_day", "--model-input-table-id", "ltv_daily_model_perf", "--model-output-table-id", "ltv_daily", "--temporary-gcs-bucket", params.storage_bucket, ], gcp_conn_id=params.conn_id, service_account=params.client_email, artifact_bucket=params.artifact_bucket, storage_bucket=params.storage_bucket, default_args=subdag_args, ), ) if params.is_dev: copy_to_dev = copy_artifacts_dev( dag, params.project_id, params.artifact_bucket, params.storage_bucket ) copy_to_dev >> ltv_daily else: wait_for_search_clients_last_seen = ExternalTaskSensor( task_id="wait_for_search_clients_last_seen", external_dag_id="bqetl_search", external_task_id="search_derived__search_clients_last_seen__v1", execution_delta=timedelta(hours=1), check_existence=True, mode="reschedule", allowed_states=ALLOWED_STATES, failed_states=FAILED_STATES, pool="DATA_ENG_EXTERNALTASKSENSOR", email_on_retry=False, dag=dag, ) wait_for_search_clients_last_seen >> ltv_daily ltv_revenue_join = bigquery_etl_query( task_id="ltv_revenue_join", destination_table="client_ltv_v1", dataset_id="revenue_derived", project_id="moz-fx-data-shared-prod", arguments=( "--clustering_fields=engine,country", "--schema_update_option=ALLOW_FIELD_ADDITION", "--schema_update_option=ALLOW_FIELD_RELAXATION", "--time_partitioning_type=DAY", "--time_partitioning_field=submission_date", ), dag=dag, ) ltv_daily >> ltv_revenue_join