in bigquery_etl/query_scheduling/task.py [0:0]
def of_query(cls, query_file, metadata=None, dag_collection=None):
"""
Create task that schedules the corresponding query in Airflow.
Raises FileNotFoundError if not metadata file exists for query.
If `metadata` is set, then it is used instead of the metadata.yaml
file that might exist alongside the query file.
"""
converter = cattrs.BaseConverter()
if metadata is None:
metadata = Metadata.of_query_file(query_file)
dag_name = metadata.scheduling.get("dag_name")
if dag_name is None:
raise UnscheduledTask(
f"Metadata for {query_file} does not contain scheduling information."
)
task_config = {"query_file": str(query_file)}
task_config.update(metadata.scheduling)
if len(metadata.owners) <= 0:
raise TaskParseException(
f"No owner specified in metadata for {query_file}."
)
# Airflow only allows to set one owner, so we just take the first
task_config["owner"] = metadata.owners[0]
# Get default email from default_args if available
default_email = []
if dag_collection is not None:
dag = dag_collection.dag_by_name(dag_name)
if dag is not None:
default_email = dag.default_args.email
email = task_config.get("email", default_email)
# Remove non-valid emails from owners e.g. Github identities and add to
# Airflow email list.
for owner in metadata.owners:
if not is_email(owner):
metadata.owners.remove(owner)
click.echo(
f"{owner} removed from email list in DAG {metadata.scheduling['dag_name']}"
)
task_config["email"] = list(set(email + metadata.owners))
# expose secret config
task_config["secrets"] = metadata.scheduling.get("secrets", [])
# to determine if BigEye task should be generated
if metadata.monitoring:
task_config["monitoring_enabled"] = metadata.monitoring.enabled
# data processed in task should be published
if metadata.is_public_json():
task_config["public_json"] = True
# Override the table_partition_template if there is no `destination_table`
# set in the scheduling section of the metadata. If not then pass a jinja
# template that reformats the date string used for table partition decorator.
# See doc here for formatting conventions:
# https://cloud.google.com/bigquery/docs/managing-partitioned-table-data#partition_decorators
if (
metadata.bigquery
and metadata.bigquery.time_partitioning
and metadata.scheduling.get("destination_table") is None
):
match metadata.bigquery.time_partitioning.type:
case PartitionType.YEAR:
partition_template = '${{ dag_run.logical_date.strftime("%Y") }}'
case PartitionType.MONTH:
partition_template = '${{ dag_run.logical_date.strftime("%Y%m") }}'
case PartitionType.DAY:
# skip for the default case of daily partitioning
partition_template = None
case PartitionType.HOUR:
partition_template = (
'${{ dag_run.logical_date.strftime("%Y%m%d%H") }}'
)
case _:
raise TaskParseException(
f"Invalid partition type: {metadata.bigquery.time_partitioning.type}"
)
if partition_template:
task_config["table_partition_template"] = partition_template
try:
return copy.deepcopy(converter.structure(task_config, cls))
except TypeError as e:
raise TaskParseException(
f"Invalid scheduling information format for {query_file}: {e}"
)