in backfill/2023-09-26-initialize-clients_first_seen_v2/bigquery_etl_cli_query.py [0:0]
def schedule(name, sql_dir, project_id, dag, depends_on_past, task_name):
"""CLI command for scheduling a query."""
query_files = paths_matching_name_pattern(name, sql_dir, project_id)
if query_files == []:
click.echo(f"Name doesn't refer to any queries: {name}", err=True)
sys.exit(1)
sql_dir = Path(sql_dir)
dags = DagCollection.from_file(sql_dir.parent / "dags.yaml")
dags_to_be_generated = set()
for query_file in query_files:
try:
metadata = Metadata.of_query_file(query_file)
except FileNotFoundError:
click.echo(f"Cannot schedule {query_file}. No metadata.yaml found.")
continue
if dag:
# check if DAG already exists
existing_dag = dags.dag_by_name(dag)
if not existing_dag:
click.echo(
(
f"DAG {dag} does not exist. "
"To see available DAGs run `bqetl dag info`. "
"To create a new DAG run `bqetl dag create`."
),
err=True,
)
sys.exit(1)
# write scheduling information to metadata file
metadata.scheduling = {}
metadata.scheduling["dag_name"] = dag
if depends_on_past:
metadata.scheduling["depends_on_past"] = depends_on_past
if task_name:
metadata.scheduling["task_name"] = task_name
metadata.write(query_file.parent / METADATA_FILE)
logging.info(
f"Updated {query_file.parent / METADATA_FILE} with scheduling"
" information. For more information about scheduling queries see: "
"https://github.com/mozilla/bigquery-etl#scheduling-queries-in-airflow"
)
# update dags since new task has been added
dags = get_dags(None, sql_dir.parent / "dags.yaml", sql_dir=sql_dir)
dags_to_be_generated.add(dag)
else:
dags = get_dags(None, sql_dir.parent / "dags.yaml", sql_dir=sql_dir)
if metadata.scheduling == {}:
click.echo(f"No scheduling information for: {query_file}", err=True)
sys.exit(1)
else:
dags_to_be_generated.add(metadata.scheduling["dag_name"])
# re-run DAG generation for the affected DAG
for d in dags_to_be_generated:
existing_dag = dags.dag_by_name(d)
logging.info(f"Running DAG generation for {existing_dag.name}")
output_dir = sql_dir.parent / "dags"
dags.dag_to_airflow(output_dir, existing_dag)