dags/mad_server.py (117 lines of code) (raw):

""" Malicious Addons Detection. This runs once a week to emit a trained model to GCS. Source code is in the private [mad-server repository](https://github.com/mozilla/mad-server/). *Triage notes* The way the app was designed it is decoupled from Airflow and will pull all data since the last successful data pull. What this means if we have a failed DAG run followed by a successful DAG run it will cover the data from the previous run. So as long as the most recent DAG run is successful the job can be considered healthy and not action is required for failed DAG runs. """ from datetime import datetime, timedelta from airflow import DAG from airflow.providers.cncf.kubernetes.secret import Secret from operators.gcp_container_operator import GKEPodOperator from utils.tags import Tag default_args = { "owner": "dzeber@mozilla.com", "depends_on_past": False, "start_date": datetime(2021, 4, 15), "email_on_failure": True, "email_on_retry": True, "retries": 1, "retry_delay": timedelta(minutes=30), } tags = [Tag.ImpactTier.tier_3] gcs_bucket = "mad-resources-training" gcs_root_training = "datasets" cloud_service = "GCS" customs_training_allow_overwrite = "True" gcloud_project = "mad-model-training" gcs_report_bucket = "mad-reports" amo_cred_issuer_secret = Secret( deploy_type="env", deploy_target="AMO_CRED_ISSUER", secret="airflow-gke-secrets", key="mad_server_secret__amo_cred_issuer", ) amo_cred_secret_secret = Secret( deploy_type="env", deploy_target="AMO_CRED_SECRET", secret="airflow-gke-secrets", key="mad_server_secret__amo_cred_secret", ) with DAG( "mad_server", default_args=default_args, schedule_interval="@weekly", doc_md=__doc__, tags=tags, ) as dag: mad_server_pull = GKEPodOperator( task_id="mad_server_pull", # Controls the entrypoint of the container, which for mad-server # defaults to bin/run rather than a shell. cmds=[ "/bin/bash", ], arguments=[ "bin/airflow-pull", ], image="us-west1-docker.pkg.dev/moz-fx-data-airflow-prod-88e0/data-science-artifacts/mad-server:latest", startup_timeout_seconds=500, gcp_conn_id="google_cloud_airflow_gke", env_vars={ "GCS_BUCKET": gcs_bucket, "GCS_ROOT_TRAINING": gcs_root_training, "CLOUD_SERVICE": cloud_service, "CUSTOMS_TRAINING_ALLOW_OVERWRITE": customs_training_allow_overwrite, }, email=[ "dzeber@mozilla.com", "gleonard@mozilla.com", ], secrets=[amo_cred_issuer_secret, amo_cred_secret_secret], ) mad_train_model = GKEPodOperator( task_id="train_model", cmds=[ "/bin/bash", ], arguments=[ "bin/train_model", "--publish", "--publish-as-latest", "./working", ], image="us-west1-docker.pkg.dev/moz-fx-data-airflow-prod-88e0/data-science-artifacts/mad-server:latest", startup_timeout_seconds=500, env_vars={ "GCS_BUCKET": gcs_bucket, "GCS_ROOT_TRAINING": gcs_root_training, "CLOUD_SERVICE": cloud_service, "CUSTOMS_TRAINING_ALLOW_OVERWRITE": customs_training_allow_overwrite, "GCLOUD_PROJECT": gcloud_project, "GCS_REPORT_BUCKET": gcs_report_bucket, }, email=[ "dzeber@mozilla.com", "gleonard@mozilla.com", ], ) new_data_eval = GKEPodOperator( task_id="evaluate_new_data", cmds=[ "/bin/bash", ], arguments=[ "bin/evaluate_new_data", "--publish", "--publish-as-latest", "./working", ], image="us-west1-docker.pkg.dev/moz-fx-data-airflow-prod-88e0/data-science-artifacts/mad-server:latest", startup_timeout_seconds=500, gcp_conn_id="google_cloud_airflow_gke", env_vars={ "GCS_BUCKET": gcs_bucket, "GCS_ROOT_TRAINING": gcs_root_training, "CLOUD_SERVICE": cloud_service, "CUSTOMS_TRAINING_ALLOW_OVERWRITE": customs_training_allow_overwrite, "GCLOUD_PROJECT": gcloud_project, "GCS_REPORT_BUCKET": gcs_report_bucket, }, email=[ "dzeber@mozilla.com", "gleonard@mozilla.com", ], ) mad_server_pull >> mad_train_model >> new_data_eval