def of_query()

in bigquery_etl/query_scheduling/task.py [0:0]


    def of_query(cls, query_file, metadata=None, dag_collection=None):
        """
        Create task that schedules the corresponding query in Airflow.

        Raises FileNotFoundError if not metadata file exists for query.
        If `metadata` is set, then it is used instead of the metadata.yaml
        file that might exist alongside the query file.
        """
        converter = cattrs.BaseConverter()
        if metadata is None:
            metadata = Metadata.of_query_file(query_file)

        dag_name = metadata.scheduling.get("dag_name")
        if dag_name is None:
            raise UnscheduledTask(
                f"Metadata for {query_file} does not contain scheduling information."
            )

        task_config = {"query_file": str(query_file)}
        task_config.update(metadata.scheduling)

        if len(metadata.owners) <= 0:
            raise TaskParseException(
                f"No owner specified in metadata for {query_file}."
            )

        # Airflow only allows to set one owner, so we just take the first
        task_config["owner"] = metadata.owners[0]

        # Get default email from default_args if available
        default_email = []
        if dag_collection is not None:
            dag = dag_collection.dag_by_name(dag_name)
            if dag is not None:
                default_email = dag.default_args.email
        email = task_config.get("email", default_email)
        # Remove non-valid emails from owners e.g. Github identities and add to
        # Airflow email list.
        for owner in metadata.owners:
            if not is_email(owner):
                metadata.owners.remove(owner)
                click.echo(
                    f"{owner} removed from email list in DAG {metadata.scheduling['dag_name']}"
                )
        task_config["email"] = list(set(email + metadata.owners))

        # expose secret config
        task_config["secrets"] = metadata.scheduling.get("secrets", [])

        # to determine if BigEye task should be generated
        if metadata.monitoring:
            task_config["monitoring_enabled"] = metadata.monitoring.enabled

        # data processed in task should be published
        if metadata.is_public_json():
            task_config["public_json"] = True

        # Override the table_partition_template if there is no `destination_table`
        # set in the scheduling section of the metadata. If not then pass a jinja
        # template that reformats the date string used for table partition decorator.
        # See doc here for formatting conventions:
        #  https://cloud.google.com/bigquery/docs/managing-partitioned-table-data#partition_decorators
        if (
            metadata.bigquery
            and metadata.bigquery.time_partitioning
            and metadata.scheduling.get("destination_table") is None
        ):
            match metadata.bigquery.time_partitioning.type:
                case PartitionType.YEAR:
                    partition_template = '${{ dag_run.logical_date.strftime("%Y") }}'
                case PartitionType.MONTH:
                    partition_template = '${{ dag_run.logical_date.strftime("%Y%m") }}'
                case PartitionType.DAY:
                    # skip for the default case of daily partitioning
                    partition_template = None
                case PartitionType.HOUR:
                    partition_template = (
                        '${{ dag_run.logical_date.strftime("%Y%m%d%H") }}'
                    )
                case _:
                    raise TaskParseException(
                        f"Invalid partition type: {metadata.bigquery.time_partitioning.type}"
                    )

            if partition_template:
                task_config["table_partition_template"] = partition_template

        try:
            return copy.deepcopy(converter.structure(task_config, cls))
        except TypeError as e:
            raise TaskParseException(
                f"Invalid scheduling information format for {query_file}: {e}"
            )