def get_dags()

in bigquery_etl/query_scheduling/generate_airflow_dags.py [0:0]


def get_dags(project_id, dags_config, sql_dir=None):
    """Return all configured DAGs including associated tasks."""
    tasks = []
    dag_collection = DagCollection.from_file(dags_config)
    for project_dir in project_dirs(project_id, sql_dir=sql_dir):
        # parse metadata.yaml to retrieve scheduling information
        if os.path.isdir(project_dir):
            for root, dirs, files in os.walk(project_dir, followlinks=True):
                try:
                    if QUERY_FILE in files:
                        query_file = os.path.join(root, QUERY_FILE)
                        task = Task.of_query(query_file, dag_collection=dag_collection)
                    elif QUERY_PART_FILE in files:
                        # multipart query
                        query_file = os.path.join(root, QUERY_PART_FILE)
                        task = Task.of_multipart_query(
                            query_file, dag_collection=dag_collection
                        )
                    elif SCRIPT_FILE in files:
                        query_file = os.path.join(root, SCRIPT_FILE)
                        task = Task.of_script(query_file, dag_collection=dag_collection)
                    elif PYTHON_SCRIPT_FILE in files:
                        query_file = os.path.join(root, PYTHON_SCRIPT_FILE)
                        task = Task.of_python_script(
                            query_file, dag_collection=dag_collection
                        )
                    else:
                        continue
                except FileNotFoundError:
                    # query has no metadata.yaml file; skip
                    pass
                except UnscheduledTask:
                    # logging.debug(
                    #     f"No scheduling information for {query_file}."
                    # )
                    #
                    # most tasks lack scheduling information for now
                    pass
                except Exception as e:
                    # in the case that there was some other error, report the query
                    # that failed before exiting
                    logging.error(f"Error processing task for query {query_file}")
                    raise e
                else:
                    if BIGEYE_FILE in files:
                        bigeye_file = os.path.join(root, BIGEYE_FILE)
                        bigeye_task = copy.deepcopy(
                            Task.of_bigeye_check(
                                bigeye_file,
                                dag_collection=dag_collection,
                            )
                        )

                        if bigeye_task.monitoring_enabled:
                            tasks.append(bigeye_task)

                    if CHECKS_FILE in files:
                        checks_file = os.path.join(root, CHECKS_FILE)
                        # todo: validate checks file

                        with open(checks_file, "r") as file:
                            file_contents = file.read()

                            if "#fail" in file_contents:
                                checks_task = copy.deepcopy(
                                    Task.of_dq_check(
                                        checks_file,
                                        is_check_fail=True,
                                        dag_collection=dag_collection,
                                    )
                                )
                                tasks.append(checks_task)

                            if "#warn" in file_contents:
                                checks_task = copy.deepcopy(
                                    Task.of_dq_check(
                                        checks_file,
                                        is_check_fail=False,
                                        dag_collection=dag_collection,
                                    )
                                )
                                tasks.append(checks_task)

                    tasks.append(task)
        else:
            logging.error(