bigquery_etl/query_scheduling/generate_airflow_dags.py (124 lines of code) (raw):

"""Generates Airflow DAGs for scheduled queries.""" import copy import logging import os from argparse import ArgumentParser from pathlib import Path from bigquery_etl.query_scheduling.dag_collection import DagCollection from bigquery_etl.query_scheduling.task import Task, UnscheduledTask from bigquery_etl.util import standard_args from bigquery_etl.util.common import project_dirs DEFAULT_DAGS_FILE = "dags.yaml" QUERY_FILE = "query.sql" QUERY_PART_FILE = "part1.sql" SCRIPT_FILE = "script.sql" PYTHON_SCRIPT_FILE = "query.py" DEFAULT_DAGS_DIR = "dags" CHECKS_FILE = "checks.sql" BIGEYE_FILE = "bigconfig.yml" parser = ArgumentParser(description=__doc__) parser.add_argument( "--dags_config", "--dags-config", help="File with DAGs configuration", default=DEFAULT_DAGS_FILE, ) parser.add_argument( "--output_dir", "--output-dir", help="Generated DAGs are written to this output directory.", default=DEFAULT_DAGS_DIR, ) parser.add_argument( "--dag_id", "--dag_id", help="The DAG to generate. Generates all DAGs by default", default=None, ) parser.add_argument( "--project_id", "--project-id", help="Project to generate DAGs for. If not specified, generate for all projects.", default=None, required=False, ) standard_args.add_log_level(parser) def get_dags(project_id, dags_config, sql_dir=None): """Return all configured DAGs including associated tasks.""" tasks = [] dag_collection = DagCollection.from_file(dags_config) for project_dir in project_dirs(project_id, sql_dir=sql_dir): # parse metadata.yaml to retrieve scheduling information if os.path.isdir(project_dir): for root, dirs, files in os.walk(project_dir, followlinks=True): try: if QUERY_FILE in files: query_file = os.path.join(root, QUERY_FILE) task = Task.of_query(query_file, dag_collection=dag_collection) elif QUERY_PART_FILE in files: # multipart query query_file = os.path.join(root, QUERY_PART_FILE) task = Task.of_multipart_query( query_file, dag_collection=dag_collection ) elif SCRIPT_FILE in files: query_file = os.path.join(root, SCRIPT_FILE) task = Task.of_script(query_file, dag_collection=dag_collection) elif PYTHON_SCRIPT_FILE in files: query_file = os.path.join(root, PYTHON_SCRIPT_FILE) task = Task.of_python_script( query_file, dag_collection=dag_collection ) else: continue except FileNotFoundError: # query has no metadata.yaml file; skip pass except UnscheduledTask: # logging.debug( # f"No scheduling information for {query_file}." # ) # # most tasks lack scheduling information for now pass except Exception as e: # in the case that there was some other error, report the query # that failed before exiting logging.error(f"Error processing task for query {query_file}") raise e else: if BIGEYE_FILE in files: bigeye_file = os.path.join(root, BIGEYE_FILE) bigeye_task = copy.deepcopy( Task.of_bigeye_check( bigeye_file, dag_collection=dag_collection, ) ) if bigeye_task.monitoring_enabled: tasks.append(bigeye_task) if CHECKS_FILE in files: checks_file = os.path.join(root, CHECKS_FILE) # todo: validate checks file with open(checks_file, "r") as file: file_contents = file.read() if "#fail" in file_contents: checks_task = copy.deepcopy( Task.of_dq_check( checks_file, is_check_fail=True, dag_collection=dag_collection, ) ) tasks.append(checks_task) if "#warn" in file_contents: checks_task = copy.deepcopy( Task.of_dq_check( checks_file, is_check_fail=False, dag_collection=dag_collection, ) ) tasks.append(checks_task) tasks.append(task) else: logging.error( """ Invalid project_dir: {}, project_dir must be a directory with structure <sql>/<project>/<dataset>/<table>/metadata.yaml. """.format( project_dir ) ) return dag_collection.with_tasks(tasks) def main(): """Generate Airflow DAGs.""" args = parser.parse_args() dags_output_dir = Path(args.output_dir) dags = get_dags(args.project_id, args.dags_config) dags.to_airflow_dags(dags_output_dir, args.dag_id) if __name__ == "__main__": main()