dags/merino_jobs.py (168 lines of code) (raw):

import datetime from typing import Any from airflow import DAG from airflow.hooks.base import BaseHook from airflow.operators.email import EmailOperator from airflow.providers.cncf.kubernetes.secret import Secret from kubernetes.client import models as k8s from operators.gcp_container_operator import GKEPodOperator from utils.tags import Tag DOCS = """\ Merino Jobs Dag for orchestrating jobs that build datasets that are used in Merino. The jobs are run via the GKEPodOperator """ def merino_job( name: str, arguments: list[str], env_vars: dict[str, Any] | None = None, **kwargs ): default_env_vars = {"MERINO_ENV": "production"} if env_vars is None: env_vars = {} default_env_vars.update(env_vars) return GKEPodOperator( task_id=name, name=name, image="mozilla/merino-py:latest", image_pull_policy="Always", project_id="moz-fx-data-airflow-gke-prod", gcp_conn_id="google_cloud_airflow_gke", cluster_name="workloads-prod-v1", location="us-west1", cmds=["python", "-m", "merino.jobs.cli"], arguments=arguments, # Needed for the jobs increased amount of domain it has # to process. container_resources = k8s.V1ResourceRequirements( requests={"memory": "512Mi"}, ), env_vars=default_env_vars, email=[ "disco-team@mozilla.com", "wstuckey@mozilla.com", ], **kwargs, ) default_args = { "owner": "disco-team@mozilla.com", "start_date": datetime.datetime(2023, 2, 1), "email": ["disco-team@mozilla.com", "wstuckey@mozilla.com"], "email_on_failure": True, "email_on_retry": True, "depends_on_past": False, # If a task fails, retry it once after waiting at least 5 minutes "retries": 1, "retry_delay": datetime.timedelta(minutes=5), } # Low priority, no triage tags = [ Tag.ImpactTier.tier_3, Tag.Triage.no_triage, ] elasticsearch_stage_apikey_secret = Secret( deploy_type="env", deploy_target="MERINO_JOBS__WIKIPEDIA_INDEXER__ES_API_KEY", secret="airflow-gke-secrets", key="merino_elasticsearch_secret__stage_api_key", ) elasticsearch_prod_apikey_secret = Secret( deploy_type="env", deploy_target="MERINO_JOBS__WIKIPEDIA_INDEXER__ES_API_KEY", secret="airflow-gke-secrets", key="merino_elasticsearch_secret__prod_api_key", ) # Run weekly on Tuesdays at 5am UTC with DAG( "merino_jobs", schedule_interval="0 5 * * 2", doc_md=DOCS, default_args=default_args, tags=tags, ) as dag: es_prod_connection = BaseHook.get_connection("merino_elasticsearch_prod") es_staging_connection = BaseHook.get_connection("merino_elasticsearch_stage") wikipedia_indexer_copy_export = merino_job( "wikipedia_indexer_copy_export", arguments=[ "wikipedia-indexer", "copy-export", "--gcs-path", "moz-fx-data-prod-external-data/contextual-services/merino-jobs/wikipedia-exports", "--gcp-project", "moz-fx-data-shared-prod", ], ) wikipedia_indexer_build_index_for_staging = merino_job( "wikipedia_indexer_build_index_staging", arguments=[ "wikipedia-indexer", "index", "--version", "v1", "--total-docs", "6600000", # Estimate of the total number of documents in wikipedia index "--elasticsearch-url", str(es_staging_connection.host), "--gcs-path", "moz-fx-data-prod-external-data/contextual-services/merino-jobs/wikipedia-exports", "--gcp-project", "moz-fx-data-shared-prod", ], secrets=[elasticsearch_stage_apikey_secret], ) wikipedia_indexer_build_index_for_prod = merino_job( "wikipedia_indexer_build_index_production", arguments=[ "wikipedia-indexer", "index", "--version", "v1", "--total-docs", "6600000", # Estimate of the total number of documents in wikipedia index "--elasticsearch-url", str(es_prod_connection.host), "--gcs-path", "moz-fx-data-prod-external-data/contextual-services/merino-jobs/wikipedia-exports", "--gcp-project", "moz-fx-data-shared-prod", ], secrets=[elasticsearch_prod_apikey_secret], ) wikipedia_indexer_copy_export >> [ wikipedia_indexer_build_index_for_staging, wikipedia_indexer_build_index_for_prod, ] # Navigational suggestions task. prepare_domain_metadata_stage = merino_job( "nav_suggestions_prepare_domain_metadata_stage", arguments=[ "navigational-suggestions", "prepare-domain-metadata", "--src-gcp-project", "moz-fx-data-shared-prod", "--dst-gcp-project", "moz-fx-merino-nonprod-ee93", "--dst-gcs-bucket", "merino-images-stagepy", "--dst-cdn-hostname", "stagepy-images.merino.nonprod.cloudops.mozgcp.net", "--force-upload", "--write-xcom", ], do_xcom_push=True, ) prepare_domain_metadata_prod = merino_job( "nav_suggestions_prepare_domain_metadata_prod", arguments=[ "navigational-suggestions", "prepare-domain-metadata", "--src-gcp-project", "moz-fx-data-shared-prod", "--dst-gcp-project", "moz-fx-merino-prod-1c2f ", "--dst-gcs-bucket", "merino-images-prodpy", "--dst-cdn-hostname", "merino-images.services.mozilla.com", "--force-upload", "--write-xcom", ], do_xcom_push=True, ) on_domain_success = EmailOperator( task_id="email_on_domain_success", to=["disco-team@mozilla.com", "wstuckey@mozilla.com"], subject="Navigational Suggestions Domain Metadata job successful", html_content=""" Job completed. Download the new top picks json file on GCS. Prod Top Pick JSON file: {{ task_instance.xcom_pull("nav_suggestions_prepare_domain_metadata_prod")["top_pick_url"]}} Stage Top Pick JSON file: {{ task_instance.xcom_pull("nav_suggestions_prepare_domain_metadata_stage")["top_pick_url"]}} Change Summary: {{ task_instance.xcom_pull("nav_suggestions_prepare_domain_metadata_prod")["diff"]}} """, ) [prepare_domain_metadata_stage, prepare_domain_metadata_prod] >> on_domain_success