dags/glam_glean_imports.py (116 lines of code) (raw):

"""Desktop ETL for importing glean data into GLAM app.""" from datetime import datetime, timedelta from airflow import DAG from airflow.models import Variable from airflow.providers.cncf.kubernetes.secret import Secret from airflow.sensors.external_task import ExternalTaskSensor from airflow.utils.task_group import TaskGroup from operators.gcp_container_operator import GKEPodOperator from utils.constants import ALLOWED_STATES, FAILED_STATES from utils.tags import Tag default_args = { "owner": "efilho@mozilla.com", "depends_on_past": False, "start_date": datetime(2019, 10, 22), "email": [ "akommasani@mozilla.com", "akomarzewski@mozilla.com", "efilho@mozilla.com", ], "email_on_failure": True, "email_on_retry": True, "retries": 1, "retry_delay": timedelta(minutes=30), } tags = [Tag.ImpactTier.tier_2] dag = DAG( "glam_glean_imports", default_args=default_args, schedule_interval="0 19 * * *", doc_md=__doc__, tags=tags, ) wait_for_glam = ExternalTaskSensor( task_id="wait_for_glam", external_dag_id="glam", external_task_group_id="extracts", execution_delta=timedelta(hours=3), check_existence=True, mode="reschedule", allowed_states=ALLOWED_STATES, failed_states=FAILED_STATES, pool="DATA_ENG_EXTERNALTASKSENSOR", email_on_retry=False, dag=dag, ) # Move logic from Glam deployment's GKE Cronjob to this dag for better dependency timing default_glean_import_image = "gcr.io/moz-fx-dataops-images-global/gcp-pipelines/glam/glam-production/glam:2023.07.1-43" base_docker_args = ["/venv/bin/python", "manage.py"] for env in ["Dev", "Prod"]: glean_import_image = default_glean_import_image if env == "Dev": # noqa SIMM114 glean_import_image = "gcr.io/moz-fx-dataops-images-global/gcp-pipelines/glam/glam-production/glam:2023.07.1-43" elif env == "Prod": glean_import_image = "gcr.io/moz-fx-dataops-images-global/gcp-pipelines/glam/glam-production/glam:2023.07.1-43" # Fetch secrets from Google Secret Manager to be injected into the pod. database_url_secret = Secret( deploy_type="env", deploy_target="DATABASE_URL", secret="airflow-gke-secrets", key=f"{env}_glam_secret__database_url", ) django_secret = Secret( deploy_type="env", deploy_target="DJANGO_SECRET_KEY", secret="airflow-gke-secrets", key=f"{env}_glam_secret__django_secret_key", ) env_vars = { # Tells Django what set of configs to load depending on the environment. Defaults to dev on the app. "DJANGO_CONFIGURATION": env, "DJANGO_DEBUG": "False", "DJANGO_SETTINGS_MODULE": "glam.settings", "GOOGLE_CLOUD_PROJECT": Variable.get(env + "_glam_project"), } default_glam_import_image = "gcr.io/moz-fx-dataops-images-global/gcp-pipelines/glam/glam-production/glam:2024.10.0-58" base_docker_args = ["/venv/bin/python", "manage.py"] for env in ["Dev", "Prod"]: glam_import_image = default_glam_import_image if env == "Dev": # noqa 114 glam_import_image = "gcr.io/moz-fx-dataops-images-global/gcp-pipelines/glam/glam-production/glam:2024.10.0-58" elif env == "Prod": glam_import_image = "gcr.io/moz-fx-dataops-images-global/gcp-pipelines/glam/glam-production/glam:2024.10.0-58" # Fetch secrets from Google Secret Manager to be injected into the pod. database_url_secret = Secret( deploy_type="env", deploy_target="DATABASE_URL", secret="airflow-gke-secrets", key=f"{env}_glam_secret__database_url", ) django_secret = Secret( deploy_type="env", deploy_target="DJANGO_SECRET_KEY", secret="airflow-gke-secrets", key=f"{env}_glam_secret__django_secret_key", ) env_vars = { "DJANGO_CONFIGURATION": env, "DJANGO_DEBUG": "False", "DJANGO_SETTINGS_MODULE": "glam.settings", "GOOGLE_CLOUD_PROJECT": Variable.get(env + "_glam_project"), } with dag as dag, TaskGroup(group_id=env + "_glam") as glam_env_task_group: glam_import_probes = GKEPodOperator( reattach_on_restart=True, task_id="glam_import_probes", name="glam_import_probes", image=glam_import_image, arguments=[*base_docker_args, "import_probes"], env_vars=env_vars, secrets=[database_url_secret, django_secret], ) glam_import_revisions = GKEPodOperator( reattach_on_restart=True, task_id="glam_import_revisions", name="glam_import_revisions", image=glam_import_image, arguments=[*base_docker_args, "import_revisions"], env_vars=env_vars, secrets=[database_url_secret, django_secret], ) wait_for_glam >> glam_env_task_group