in backfill/2023-09-26-initialize-clients_first_seen_v2/bigquery_etl_cli_query.py [0:0]
def create(ctx, name, sql_dir, project_id, owner, init, dag, no_schedule):
"""CLI command for creating a new query."""
# create directory structure for query
try:
match = QUERY_NAME_RE.match(name)
name = match.group("name")
dataset = match.group("dataset")
version = "_" + name.split("_")[-1]
if not VERSION_RE.match(version):
version = "_v1"
else:
name = "_".join(name.split("_")[:-1])
except AttributeError:
click.echo(
"New queries must be named like:"
+ " <dataset>.<table> or <dataset>.<table>_v[n]"
)
sys.exit(1)
derived_path = None
view_path = None
path = Path(sql_dir)
if dataset.endswith("_derived"):
# create a directory for the corresponding view
derived_path = path / project_id / dataset / (name + version)
derived_path.mkdir(parents=True)
view_path = path / project_id / dataset.replace("_derived", "") / name
view_path.mkdir(parents=True)
else:
# check if there is a corresponding derived dataset
if (path / project_id / (dataset + "_derived")).exists():
derived_path = path / project_id / (dataset + "_derived") / (name + version)
derived_path.mkdir(parents=True)
view_path = path / project_id / dataset / name
view_path.mkdir(parents=True)
dataset = dataset + "_derived"
else:
# some dataset that is not specified as _derived
# don't automatically create views
derived_path = path / project_id / dataset / (name + version)
derived_path.mkdir(parents=True)
click.echo(f"Created query in {derived_path}")
if view_path:
click.echo(f"Created corresponding view in {view_path}")
view_file = view_path / "view.sql"
view_dataset = dataset.replace("_derived", "")
view_file.write_text(
reformat(
f"""CREATE OR REPLACE VIEW
`{project_id}.{view_dataset}.{name}`
AS SELECT * FROM
`{project_id}.{dataset}.{name}{version}`"""
)
+ "\n"
)
# create query.sql file
query_file = derived_path / "query.sql"
query_file.write_text(
reformat(
f"""-- Query for {dataset}.{name}{version}
-- For more information on writing queries see:
-- https://docs.telemetry.mozilla.org/cookbooks/bigquery/querying.html
SELECT * FROM table WHERE submission_date = @submission_date"""
)
+ "\n"
)
# create default metadata.yaml
metadata_file = derived_path / "metadata.yaml"
metadata = Metadata(
friendly_name=string.capwords(name.replace("_", " ")),
description="Please provide a description for the query",
owners=[owner],
labels={"incremental": True},
bigquery=BigQueryMetadata(
time_partitioning=PartitionMetadata(field="", type=PartitionType.DAY),
clustering=ClusteringMetadata(fields=[]),
),
)
metadata.write(metadata_file)
# optionally create init.sql
if init:
init_file = derived_path / "init.sql"
init_file.write_text(
reformat(
f"""
-- SQL for initializing the query destination table.
CREATE OR REPLACE TABLE
`{ConfigLoader.get('default', 'project', fallback="moz-fx-data-shared-prod")}.{dataset}.{name}{version}`
AS SELECT * FROM table"""
)
+ "\n"
)
dataset_metadata_file = derived_path.parent / "dataset_metadata.yaml"
if not dataset_metadata_file.exists():
dataset_name = str(dataset_metadata_file.parent.name)
dataset_metadata = DatasetMetadata(
friendly_name=string.capwords(dataset_name.replace("_", " ")),
description="Please provide a description for the dataset",
dataset_base_acl="derived",
user_facing=False,
)
dataset_metadata.write(dataset_metadata_file)
click.echo(f"Created dataset metadata in {dataset_metadata_file}")
if view_path:
dataset_metadata_file = view_path.parent / "dataset_metadata.yaml"
if not dataset_metadata_file.exists():
dataset_name = str(dataset_metadata_file.parent.name)
dataset_metadata = DatasetMetadata(
friendly_name=string.capwords(dataset_name.replace("_", " ")),
description="Please provide a description for the dataset",
dataset_base_acl="view",
user_facing=True,
)
dataset_metadata.write(dataset_metadata_file)
click.echo(f"Created dataset metadata in {dataset_metadata_file}")
if no_schedule:
click.echo(
click.style(
"WARNING: This query has been created without "
"scheduling information. Use `bqetl query schedule`"
" to manually add it to a DAG or "
"`bqetl query create --help` for more options.",
fg="yellow",
)
)
else:
ctx.invoke(schedule, name=derived_path, dag=dag)