in bigquery_etl/query_scheduling/generate_airflow_dags.py [0:0]
def get_dags(project_id, dags_config, sql_dir=None):
"""Return all configured DAGs including associated tasks."""
tasks = []
dag_collection = DagCollection.from_file(dags_config)
for project_dir in project_dirs(project_id, sql_dir=sql_dir):
# parse metadata.yaml to retrieve scheduling information
if os.path.isdir(project_dir):
for root, dirs, files in os.walk(project_dir, followlinks=True):
try:
if QUERY_FILE in files:
query_file = os.path.join(root, QUERY_FILE)
task = Task.of_query(query_file, dag_collection=dag_collection)
elif QUERY_PART_FILE in files:
# multipart query
query_file = os.path.join(root, QUERY_PART_FILE)
task = Task.of_multipart_query(
query_file, dag_collection=dag_collection
)
elif SCRIPT_FILE in files:
query_file = os.path.join(root, SCRIPT_FILE)
task = Task.of_script(query_file, dag_collection=dag_collection)
elif PYTHON_SCRIPT_FILE in files:
query_file = os.path.join(root, PYTHON_SCRIPT_FILE)
task = Task.of_python_script(
query_file, dag_collection=dag_collection
)
else:
continue
except FileNotFoundError:
# query has no metadata.yaml file; skip
pass
except UnscheduledTask:
# logging.debug(
# f"No scheduling information for {query_file}."
# )
#
# most tasks lack scheduling information for now
pass
except Exception as e:
# in the case that there was some other error, report the query
# that failed before exiting
logging.error(f"Error processing task for query {query_file}")
raise e
else:
if BIGEYE_FILE in files:
bigeye_file = os.path.join(root, BIGEYE_FILE)
bigeye_task = copy.deepcopy(
Task.of_bigeye_check(
bigeye_file,
dag_collection=dag_collection,
)
)
if bigeye_task.monitoring_enabled:
tasks.append(bigeye_task)
if CHECKS_FILE in files:
checks_file = os.path.join(root, CHECKS_FILE)
# todo: validate checks file
with open(checks_file, "r") as file:
file_contents = file.read()
if "#fail" in file_contents:
checks_task = copy.deepcopy(
Task.of_dq_check(
checks_file,
is_check_fail=True,
dag_collection=dag_collection,
)
)
tasks.append(checks_task)
if "#warn" in file_contents:
checks_task = copy.deepcopy(
Task.of_dq_check(
checks_file,
is_check_fail=False,
dag_collection=dag_collection,
)
)
tasks.append(checks_task)
tasks.append(task)
else:
logging.error(