dags/merino_jobs.py (168 lines of code) (raw):
import datetime
from typing import Any
from airflow import DAG
from airflow.hooks.base import BaseHook
from airflow.operators.email import EmailOperator
from airflow.providers.cncf.kubernetes.secret import Secret
from kubernetes.client import models as k8s
from operators.gcp_container_operator import GKEPodOperator
from utils.tags import Tag
DOCS = """\
Merino Jobs
Dag for orchestrating jobs that build datasets that are used in Merino.
The jobs are run via the GKEPodOperator
"""
def merino_job(
name: str, arguments: list[str], env_vars: dict[str, Any] | None = None, **kwargs
):
default_env_vars = {"MERINO_ENV": "production"}
if env_vars is None:
env_vars = {}
default_env_vars.update(env_vars)
return GKEPodOperator(
task_id=name,
name=name,
image="mozilla/merino-py:latest",
image_pull_policy="Always",
project_id="moz-fx-data-airflow-gke-prod",
gcp_conn_id="google_cloud_airflow_gke",
cluster_name="workloads-prod-v1",
location="us-west1",
cmds=["python", "-m", "merino.jobs.cli"],
arguments=arguments,
# Needed for the jobs increased amount of domain it has
# to process.
container_resources = k8s.V1ResourceRequirements(
requests={"memory": "512Mi"},
),
env_vars=default_env_vars,
email=[
"disco-team@mozilla.com",
"wstuckey@mozilla.com",
],
**kwargs,
)
default_args = {
"owner": "disco-team@mozilla.com",
"start_date": datetime.datetime(2023, 2, 1),
"email": ["disco-team@mozilla.com", "wstuckey@mozilla.com"],
"email_on_failure": True,
"email_on_retry": True,
"depends_on_past": False,
# If a task fails, retry it once after waiting at least 5 minutes
"retries": 1,
"retry_delay": datetime.timedelta(minutes=5),
}
# Low priority, no triage
tags = [
Tag.ImpactTier.tier_3,
Tag.Triage.no_triage,
]
elasticsearch_stage_apikey_secret = Secret(
deploy_type="env",
deploy_target="MERINO_JOBS__WIKIPEDIA_INDEXER__ES_API_KEY",
secret="airflow-gke-secrets",
key="merino_elasticsearch_secret__stage_api_key",
)
elasticsearch_prod_apikey_secret = Secret(
deploy_type="env",
deploy_target="MERINO_JOBS__WIKIPEDIA_INDEXER__ES_API_KEY",
secret="airflow-gke-secrets",
key="merino_elasticsearch_secret__prod_api_key",
)
# Run weekly on Tuesdays at 5am UTC
with DAG(
"merino_jobs",
schedule_interval="0 5 * * 2",
doc_md=DOCS,
default_args=default_args,
tags=tags,
) as dag:
es_prod_connection = BaseHook.get_connection("merino_elasticsearch_prod")
es_staging_connection = BaseHook.get_connection("merino_elasticsearch_stage")
wikipedia_indexer_copy_export = merino_job(
"wikipedia_indexer_copy_export",
arguments=[
"wikipedia-indexer",
"copy-export",
"--gcs-path",
"moz-fx-data-prod-external-data/contextual-services/merino-jobs/wikipedia-exports",
"--gcp-project",
"moz-fx-data-shared-prod",
],
)
wikipedia_indexer_build_index_for_staging = merino_job(
"wikipedia_indexer_build_index_staging",
arguments=[
"wikipedia-indexer",
"index",
"--version",
"v1",
"--total-docs",
"6600000", # Estimate of the total number of documents in wikipedia index
"--elasticsearch-url",
str(es_staging_connection.host),
"--gcs-path",
"moz-fx-data-prod-external-data/contextual-services/merino-jobs/wikipedia-exports",
"--gcp-project",
"moz-fx-data-shared-prod",
],
secrets=[elasticsearch_stage_apikey_secret],
)
wikipedia_indexer_build_index_for_prod = merino_job(
"wikipedia_indexer_build_index_production",
arguments=[
"wikipedia-indexer",
"index",
"--version",
"v1",
"--total-docs",
"6600000", # Estimate of the total number of documents in wikipedia index
"--elasticsearch-url",
str(es_prod_connection.host),
"--gcs-path",
"moz-fx-data-prod-external-data/contextual-services/merino-jobs/wikipedia-exports",
"--gcp-project",
"moz-fx-data-shared-prod",
],
secrets=[elasticsearch_prod_apikey_secret],
)
wikipedia_indexer_copy_export >> [
wikipedia_indexer_build_index_for_staging,
wikipedia_indexer_build_index_for_prod,
]
# Navigational suggestions task.
prepare_domain_metadata_stage = merino_job(
"nav_suggestions_prepare_domain_metadata_stage",
arguments=[
"navigational-suggestions",
"prepare-domain-metadata",
"--src-gcp-project",
"moz-fx-data-shared-prod",
"--dst-gcp-project",
"moz-fx-merino-nonprod-ee93",
"--dst-gcs-bucket",
"merino-images-stagepy",
"--dst-cdn-hostname",
"stagepy-images.merino.nonprod.cloudops.mozgcp.net",
"--force-upload",
"--write-xcom",
],
do_xcom_push=True,
)
prepare_domain_metadata_prod = merino_job(
"nav_suggestions_prepare_domain_metadata_prod",
arguments=[
"navigational-suggestions",
"prepare-domain-metadata",
"--src-gcp-project",
"moz-fx-data-shared-prod",
"--dst-gcp-project",
"moz-fx-merino-prod-1c2f ",
"--dst-gcs-bucket",
"merino-images-prodpy",
"--dst-cdn-hostname",
"merino-images.services.mozilla.com",
"--force-upload",
"--write-xcom",
],
do_xcom_push=True,
)
on_domain_success = EmailOperator(
task_id="email_on_domain_success",
to=["disco-team@mozilla.com", "wstuckey@mozilla.com"],
subject="Navigational Suggestions Domain Metadata job successful",
html_content="""
Job completed. Download the new top picks json file on GCS.
Prod Top Pick JSON file: {{ task_instance.xcom_pull("nav_suggestions_prepare_domain_metadata_prod")["top_pick_url"]}}
Stage Top Pick JSON file: {{ task_instance.xcom_pull("nav_suggestions_prepare_domain_metadata_stage")["top_pick_url"]}}
Change Summary:
{{ task_instance.xcom_pull("nav_suggestions_prepare_domain_metadata_prod")["diff"]}}
""",
)
[prepare_domain_metadata_stage, prepare_domain_metadata_prod] >> on_domain_success