bigquery_etl/cli/dag.py (211 lines of code) (raw):
"""bigquery-etl CLI dag command."""
import os
import sys
from pathlib import Path
import rich_click as click
import yaml
from ..cli.utils import is_valid_dir, is_valid_file, sql_dir_option
from ..metadata.parse_metadata import METADATA_FILE, Metadata
from ..query_scheduling.dag import Dag
from ..query_scheduling.dag_collection import DagCollection
from ..query_scheduling.generate_airflow_dags import get_dags
dags_config_option = click.option(
"--dags_config",
"--dags-config",
help="Path to dags.yaml config file",
type=click.Path(file_okay=True),
default="dags.yaml",
callback=is_valid_file,
)
output_dir_option = click.option(
"--output-dir",
"--output_dir",
help="Path directory with generated DAGs",
type=click.Path(file_okay=False),
default="dags/",
callback=is_valid_dir,
)
@click.group(help="Commands for managing DAGs.")
def dag():
"""Create the CLI group for the dag command."""
pass
@dag.command(
help="""Get information about available DAGs.
Examples:
# Get information about all available DAGs
./bqetl dag info
# Get information about a specific DAG
./bqetl dag info bqetl_ssl_ratios
# Get information about a specific DAG including scheduled tasks
./bqetl dag info --with_tasks bqetl_ssl_ratios
""",
)
@click.argument("name", required=False)
@dags_config_option
@sql_dir_option
@click.option(
"--with_tasks",
"--with-tasks",
"-t",
help="Include scheduled tasks",
default=False,
is_flag=True,
)
def info(name, dags_config, sql_dir, with_tasks):
"""List available DAG information."""
if with_tasks:
dag_collection = get_dags(None, dags_config, sql_dir=sql_dir)
else:
dag_collection = DagCollection.from_file(dags_config)
if name:
dag = dag_collection.dag_by_name(name)
if not dag:
click.echo(f"DAG {name} does not exist", err=True)
sys.exit(1)
sorted_dags = [dag]
else:
sorted_dags = sorted(dag_collection.dags, key=lambda d: d.name)
for dag in sorted_dags:
click.secho(dag.name, bold=True)
click.echo(f"schedule_interval: {dag.schedule_interval}")
click.echo(f"owner: {dag.default_args.owner}")
if with_tasks:
click.echo("tasks: ")
for task in sorted(dag.tasks, key=lambda d: d.table):
click.echo(
f" - {task.project}.{task.dataset}.{task.table}_{task.version}"
)
click.echo("")
@dag.command(
help="""Create a new DAG with name bqetl_<dag_name>, for example: bqetl_search
When creating new DAGs, the DAG name must have a `bqetl_` prefix.
Created DAGs are added to the `dags.yaml` file.
Examples:
\b
./bqetl dag create bqetl_core \\
--schedule-interval="0 2 * * *" \\
--owner=example@mozilla.com \\
--description="Tables derived from `core` pings sent by mobile applications." \\
--tag=impact/tier_1 \\
--start-date=2019-07-25
\b
# Create DAG and overwrite default settings
./bqetl dag create bqetl_ssl_ratios --schedule-interval="0 2 * * *" \\
--owner=example@mozilla.com \\
--description="The DAG schedules SSL ratios queries." \\
--tag=impact/tier_1 \\
--start-date=2019-07-20 \\
--email=example2@mozilla.com \\
--email=example3@mozilla.com \\
--retries=2 \\
--retry_delay=30m
"""
)
@click.argument("name")
@dags_config_option
@click.option(
"--schedule_interval",
"--schedule-interval",
help=(
"Schedule interval of the new DAG. "
"Schedule intervals can be either in CRON format or one of: "
"once, hourly, daily, weekly, monthly, yearly or a timedelta []d[]h[]m"
),
required=True,
)
@click.option(
"--owner",
help=("Email address of the DAG owner"),
required=True,
)
@click.option(
"--description",
help=("Description for DAG"),
required=True,
)
@click.option(
"--tag",
help=("Tag to apply to the DAG"),
required=True,
multiple=True,
)
@click.option(
"--start_date",
"--start-date",
help=("First date for which scheduled queries should be executed"),
required=True,
)
@click.option(
"--email",
help=("Email addresses that Airflow will send alerts to"),
default=["telemetry-alerts@mozilla.com"],
multiple=True,
)
@click.option(
"--retries",
help=("Number of retries Airflow will attempt in case of failures"),
default=2,
)
@click.option(
"--retry_delay",
"--retry-delay",
help=(
"Time period Airflow will wait after failures before running failed tasks again"
),
default="30m",
)
@click.option(
"--catchup",
help=("Allow DAG to run for past dates if its start date is in the past"),
default=False,
)
def create(
name,
dags_config,
schedule_interval,
owner,
description,
tag,
start_date,
email,
retries,
retry_delay,
catchup,
):
"""Create a new DAG."""
# create a DAG and validate all properties
new_dag = Dag.from_dict(
{
name: {
"description": description,
"schedule_interval": schedule_interval,
"default_args": {
"owner": owner,
"start_date": start_date,
"email": (*email, owner),
"retries": retries,
"retry_delay": retry_delay,
},
"catchup": catchup,
"tags": tag,
}
}
)
with open(dags_config, "a") as dags_file:
dags_file.write("\n")
dags_file.write(yaml.dump(new_dag.to_dict()))
dags_file.write("\n")
click.echo(f"Added new DAG definition to {dags_config}")
@dag.command(
help="""Generate Airflow DAGs from DAG definitions.
Examples:
# Generate all DAGs
./bqetl dag generate
# Generate a specific DAG
./bqetl dag generate bqetl_ssl_ratios
"""
)
@click.argument("name", required=False)
@dags_config_option
@sql_dir_option
@output_dir_option
def generate(name, dags_config, sql_dir, output_dir):
"""CLI command for generating Airflow DAGs."""
dags = get_dags(None, dags_config, sql_dir)
if name:
# only generate specific DAG
dag = dags.dag_by_name(name)
if not dag:
click.echo(f"DAG {name} does not exist.", err=True)
sys.exit(1)
dags.to_airflow_dags(output_dir, dag_to_generate=dag)
click.echo(f"Generated {os.path.join(output_dir, dag.name)}.py")
else:
# re-generate all DAGs
dags.to_airflow_dags(output_dir)
click.echo("DAG generation complete.")
@dag.command(
help="""Remove a DAG.
This will also remove the scheduling information from the queries that were scheduled
as part of the DAG.
Examples:
# Remove a specific DAG
./bqetl dag remove bqetl_vrbrowser
"""
)
@click.argument("name", required=False)
@dags_config_option
@sql_dir_option
@output_dir_option
def remove(name, dags_config, sql_dir, output_dir):
"""
CLI command for removing a DAG.
Also removes scheduling information from queries that were referring to the DAG.
"""
# remove from task schedulings
dags = get_dags(None, dags_config, sql_dir=sql_dir)
dag_tbr = dags.dag_by_name(name)
if not dag_tbr:
click.echo(f"No existing DAG definition for {name}")
sys.exit(1)
for task in dag_tbr.tasks:
metadata = Metadata.of_query_file(task.query_file)
sql_path = Path(os.path.dirname(task.query_file))
metadata.scheduling = {}
metadata_file = sql_path / METADATA_FILE
metadata.write(metadata_file)
# remove from dags.yaml
with open(dags_config) as dags_file:
dags_config_dict = yaml.full_load(dags_file)
del dags_config_dict[name]
with open(dags_config, "w") as dags_file:
dags_file.write(yaml.dump(dags_config_dict))
output_dir = Path(output_dir)
# delete generated DAG from dags/
if os.path.exists(output_dir / (name + ".py")):
os.remove(output_dir / (name + ".py"))
click.echo(f"DAG {name} and referenced removed.")