bigquery_etl/cli/dag.py (211 lines of code) (raw):

"""bigquery-etl CLI dag command.""" import os import sys from pathlib import Path import rich_click as click import yaml from ..cli.utils import is_valid_dir, is_valid_file, sql_dir_option from ..metadata.parse_metadata import METADATA_FILE, Metadata from ..query_scheduling.dag import Dag from ..query_scheduling.dag_collection import DagCollection from ..query_scheduling.generate_airflow_dags import get_dags dags_config_option = click.option( "--dags_config", "--dags-config", help="Path to dags.yaml config file", type=click.Path(file_okay=True), default="dags.yaml", callback=is_valid_file, ) output_dir_option = click.option( "--output-dir", "--output_dir", help="Path directory with generated DAGs", type=click.Path(file_okay=False), default="dags/", callback=is_valid_dir, ) @click.group(help="Commands for managing DAGs.") def dag(): """Create the CLI group for the dag command.""" pass @dag.command( help="""Get information about available DAGs. Examples: # Get information about all available DAGs ./bqetl dag info # Get information about a specific DAG ./bqetl dag info bqetl_ssl_ratios # Get information about a specific DAG including scheduled tasks ./bqetl dag info --with_tasks bqetl_ssl_ratios """, ) @click.argument("name", required=False) @dags_config_option @sql_dir_option @click.option( "--with_tasks", "--with-tasks", "-t", help="Include scheduled tasks", default=False, is_flag=True, ) def info(name, dags_config, sql_dir, with_tasks): """List available DAG information.""" if with_tasks: dag_collection = get_dags(None, dags_config, sql_dir=sql_dir) else: dag_collection = DagCollection.from_file(dags_config) if name: dag = dag_collection.dag_by_name(name) if not dag: click.echo(f"DAG {name} does not exist", err=True) sys.exit(1) sorted_dags = [dag] else: sorted_dags = sorted(dag_collection.dags, key=lambda d: d.name) for dag in sorted_dags: click.secho(dag.name, bold=True) click.echo(f"schedule_interval: {dag.schedule_interval}") click.echo(f"owner: {dag.default_args.owner}") if with_tasks: click.echo("tasks: ") for task in sorted(dag.tasks, key=lambda d: d.table): click.echo( f" - {task.project}.{task.dataset}.{task.table}_{task.version}" ) click.echo("") @dag.command( help="""Create a new DAG with name bqetl_<dag_name>, for example: bqetl_search When creating new DAGs, the DAG name must have a `bqetl_` prefix. Created DAGs are added to the `dags.yaml` file. Examples: \b ./bqetl dag create bqetl_core \\ --schedule-interval="0 2 * * *" \\ --owner=example@mozilla.com \\ --description="Tables derived from `core` pings sent by mobile applications." \\ --tag=impact/tier_1 \\ --start-date=2019-07-25 \b # Create DAG and overwrite default settings ./bqetl dag create bqetl_ssl_ratios --schedule-interval="0 2 * * *" \\ --owner=example@mozilla.com \\ --description="The DAG schedules SSL ratios queries." \\ --tag=impact/tier_1 \\ --start-date=2019-07-20 \\ --email=example2@mozilla.com \\ --email=example3@mozilla.com \\ --retries=2 \\ --retry_delay=30m """ ) @click.argument("name") @dags_config_option @click.option( "--schedule_interval", "--schedule-interval", help=( "Schedule interval of the new DAG. " "Schedule intervals can be either in CRON format or one of: " "once, hourly, daily, weekly, monthly, yearly or a timedelta []d[]h[]m" ), required=True, ) @click.option( "--owner", help=("Email address of the DAG owner"), required=True, ) @click.option( "--description", help=("Description for DAG"), required=True, ) @click.option( "--tag", help=("Tag to apply to the DAG"), required=True, multiple=True, ) @click.option( "--start_date", "--start-date", help=("First date for which scheduled queries should be executed"), required=True, ) @click.option( "--email", help=("Email addresses that Airflow will send alerts to"), default=["telemetry-alerts@mozilla.com"], multiple=True, ) @click.option( "--retries", help=("Number of retries Airflow will attempt in case of failures"), default=2, ) @click.option( "--retry_delay", "--retry-delay", help=( "Time period Airflow will wait after failures before running failed tasks again" ), default="30m", ) @click.option( "--catchup", help=("Allow DAG to run for past dates if its start date is in the past"), default=False, ) def create( name, dags_config, schedule_interval, owner, description, tag, start_date, email, retries, retry_delay, catchup, ): """Create a new DAG.""" # create a DAG and validate all properties new_dag = Dag.from_dict( { name: { "description": description, "schedule_interval": schedule_interval, "default_args": { "owner": owner, "start_date": start_date, "email": (*email, owner), "retries": retries, "retry_delay": retry_delay, }, "catchup": catchup, "tags": tag, } } ) with open(dags_config, "a") as dags_file: dags_file.write("\n") dags_file.write(yaml.dump(new_dag.to_dict())) dags_file.write("\n") click.echo(f"Added new DAG definition to {dags_config}") @dag.command( help="""Generate Airflow DAGs from DAG definitions. Examples: # Generate all DAGs ./bqetl dag generate # Generate a specific DAG ./bqetl dag generate bqetl_ssl_ratios """ ) @click.argument("name", required=False) @dags_config_option @sql_dir_option @output_dir_option def generate(name, dags_config, sql_dir, output_dir): """CLI command for generating Airflow DAGs.""" dags = get_dags(None, dags_config, sql_dir) if name: # only generate specific DAG dag = dags.dag_by_name(name) if not dag: click.echo(f"DAG {name} does not exist.", err=True) sys.exit(1) dags.to_airflow_dags(output_dir, dag_to_generate=dag) click.echo(f"Generated {os.path.join(output_dir, dag.name)}.py") else: # re-generate all DAGs dags.to_airflow_dags(output_dir) click.echo("DAG generation complete.") @dag.command( help="""Remove a DAG. This will also remove the scheduling information from the queries that were scheduled as part of the DAG. Examples: # Remove a specific DAG ./bqetl dag remove bqetl_vrbrowser """ ) @click.argument("name", required=False) @dags_config_option @sql_dir_option @output_dir_option def remove(name, dags_config, sql_dir, output_dir): """ CLI command for removing a DAG. Also removes scheduling information from queries that were referring to the DAG. """ # remove from task schedulings dags = get_dags(None, dags_config, sql_dir=sql_dir) dag_tbr = dags.dag_by_name(name) if not dag_tbr: click.echo(f"No existing DAG definition for {name}") sys.exit(1) for task in dag_tbr.tasks: metadata = Metadata.of_query_file(task.query_file) sql_path = Path(os.path.dirname(task.query_file)) metadata.scheduling = {} metadata_file = sql_path / METADATA_FILE metadata.write(metadata_file) # remove from dags.yaml with open(dags_config) as dags_file: dags_config_dict = yaml.full_load(dags_file) del dags_config_dict[name] with open(dags_config, "w") as dags_file: dags_file.write(yaml.dump(dags_config_dict)) output_dir = Path(output_dir) # delete generated DAG from dags/ if os.path.exists(output_dir / (name + ".py")): os.remove(output_dir / (name + ".py")) click.echo(f"DAG {name} and referenced removed.")