utils/glam_subdags/generate_query.py (119 lines of code) (raw):

from operators.gcp_container_operator import GKEPodOperator def generate_and_run_desktop_query( task_id, project_id, billing_project_id, source_dataset_id, sample_size, overwrite, probe_type, reattach_on_restart=False, destination_dataset_id=None, process=None, docker_image="gcr.io/moz-fx-data-airflow-prod-88e0/bigquery-etl:latest", **kwargs, ): """ Generate and run firefox desktop queries. :param task_id: Airflow task id :param project_id: GCP project to write to :param billing_project_id: Project to run query on and used for billing :param source_dataset_id: Bigquery dataset to read from in queries :param sample_size: Value to use for windows release client sampling :param overwrite: Overwrite the destination table :param probe_type: Probe type to generate query :param destination_dataset_id: Bigquery dataset to write results to. Defaults to source_dataset_id :param process: Process to filter probes for. Gets all processes by default. :param docker_image: Docker image :param gcp_conn_id: Airflow GCP connection """ if destination_dataset_id is None: destination_dataset_id = source_dataset_id env_vars = { "PROJECT": project_id, "BILLING_PROJECT": billing_project_id, "PROD_DATASET": source_dataset_id, "DATASET": destination_dataset_id, "SUBMISSION_DATE": "{{ ds }}", "RUN_QUERY": "t", } if not overwrite: env_vars["APPEND"] = "t" command = [ "script/glam/generate_and_run_desktop_sql", probe_type, sample_size, ] if process is not None: command.append(process) return GKEPodOperator( reattach_on_restart=reattach_on_restart, task_id=task_id, cmds=["bash"], env_vars=env_vars, arguments=command, image=docker_image, is_delete_operator_pod=False, **kwargs, ) def generate_and_run_glean_queries( task_id, product, destination_project_id, destination_dataset_id="glam_etl", source_project_id="moz-fx-data-shared-prod", docker_image="gcr.io/moz-fx-data-airflow-prod-88e0/bigquery-etl:latest", env_vars=None, **kwargs, ): """ Generate and run fog and fenix queries. :param task_id: Airflow task id :param product: Product name of glean app :param destination_project_id: Project to store derived tables :param destination_dataset_id: Name of the dataset to store derived tables :param source_project_id: Project containing the source datasets :param docker_image: Docker image :param gcp_conn_id: Airflow GCP connection :param env_vars: Additional environment variables to pass """ if env_vars is None: env_vars = {} env_vars = { "PRODUCT": product, "SRC_PROJECT": source_project_id, "PROJECT": destination_project_id, "DATASET": destination_dataset_id, "SUBMISSION_DATE": "{{ ds }}", **env_vars, } return GKEPodOperator( reattach_on_restart=True, task_id=task_id, cmds=["bash", "-c"], env_vars=env_vars, arguments=["script/glam/generate_glean_sql && script/glam/run_glam_sql"], image=docker_image, is_delete_operator_pod=False, **kwargs, ) def generate_and_run_glean_task( task_type, task_name, product, destination_project_id, destination_dataset_id="glam_etl", source_project_id="moz-fx-data-shared-prod", docker_image="gcr.io/moz-fx-data-airflow-prod-88e0/bigquery-etl:latest", env_vars=None, min_sample_id = 0, max_sample_id = 99, replace_table = True, **kwargs, ): """ See https://github.com/mozilla/bigquery-etl/blob/main/script/glam/run_glam_sql. :param task_type: Either view, init, or query :param task_name: Name of the task, derives the name of the query :param product: Product name of glean app :param destination_project_id: Project to store derived tables :param destination_dataset_id: Name of the dataset to store derived tables :param source_project_id: Project containing the source datasets :param docker_image: Docker image :param gcp_conn_id: Airflow GCP connection :param env_vars: Additional environment variables to pass """ if env_vars is None: env_vars = {} query_name = task_name.split("_sampled_")[0] # If a range smaller than 100% of the samples is being used then sample_id is needed. use_sample_id = min_sample_id > 0 or max_sample_id < 99 write_preference = "--replace" if replace_table else "'--append_table --noreplace'" env_vars = { "PRODUCT": product, "SRC_PROJECT": source_project_id, "PROJECT": destination_project_id, "DATASET": destination_dataset_id, "SUBMISSION_DATE": "{{ ds }}", "IMPORT": "true", "USE_SAMPLE_ID": str(use_sample_id), **env_vars, } if task_type not in ["view", "init", "query"]: raise ValueError("task_type must be either a view, init, or query") return GKEPodOperator( reattach_on_restart=True, task_id=f"{task_type}_{task_name}", cmds=["bash", "-c"], env_vars=env_vars, arguments=[ "script/glam/generate_glean_sql && " "source script/glam/run_glam_sql && " f'run_{task_type} {query_name} false {min_sample_id} {max_sample_id} "" {write_preference}' ], image=docker_image, is_delete_operator_pod=False, **kwargs, )