def dry_run_result()

in bigquery_etl/dryrun.py [0:0]


    def dry_run_result(self):
        """Dry run the provided SQL file."""
        if self.content:
            sql = self.content
        else:
            sql = self.get_sql()

        query_parameters = []
        scheduling_metadata = self.metadata.scheduling if self.metadata else {}
        if date_partition_parameter := scheduling_metadata.get(
            "date_partition_parameter", "submission_date"
        ):
            query_parameters.append(
                bigquery.ScalarQueryParameter(
                    date_partition_parameter,
                    "DATE",
                    QUERY_PARAMETER_TYPE_VALUES["DATE"],
                )
            )
        for parameter in scheduling_metadata.get("parameters", []):
            parameter_name, parameter_type, _ = parameter.strip().split(":", 2)
            parameter_type = parameter_type.upper() or "STRING"
            query_parameters.append(
                bigquery.ScalarQueryParameter(
                    parameter_name,
                    parameter_type,
                    QUERY_PARAMETER_TYPE_VALUES.get(parameter_type),
                )
            )

        project = basename(dirname(dirname(dirname(self.sqlfile))))
        dataset = basename(dirname(dirname(self.sqlfile)))
        try:
            start_time = time.time()
            if self.use_cloud_function:
                json_data = {
                    "project": self.project or project,
                    "dataset": self.dataset or dataset,
                    "query": sql,
                    "query_parameters": [
                        query_parameter.to_api_repr()
                        for query_parameter in query_parameters
                    ],
                }

                if self.table:
                    json_data["table"] = self.table

                r = urlopen(
                    Request(
                        self.dry_run_url,
                        headers={
                            "Content-Type": "application/json",
                            "Authorization": f"Bearer {self.id_token}",
                        },
                        data=json.dumps(json_data).encode("utf8"),
                        method="POST",
                    )
                )
                result = json.load(r)
            else:
                self.client.project = project
                job_config = bigquery.QueryJobConfig(
                    dry_run=True,
                    use_query_cache=False,
                    default_dataset=f"{project}.{dataset}",
                    query_parameters=query_parameters,
                )
                job = self.client.query(sql, job_config=job_config)
                try:
                    dataset_labels = self.client.get_dataset(job.default_dataset).labels
                except Exception as e:
                    # Most users do not have bigquery.datasets.get permission in
                    # moz-fx-data-shared-prod
                    # This should not prevent the dry run from running since the dataset
                    # labels are usually not required
                    if "Permission bigquery.datasets.get denied on dataset" in str(e):
                        dataset_labels = []
                    else:
                        raise e

                result = {
                    "valid": True,
                    "referencedTables": [
                        ref.to_api_repr() for ref in job.referenced_tables
                    ],
                    "schema": (
                        job._properties.get("statistics", {})
                        .get("query", {})
                        .get("schema", {})
                    ),
                    "datasetLabels": dataset_labels,
                }
                if (
                    self.project is not None
                    and self.table is not None
                    and self.dataset is not None
                ):
                    table = self.client.get_table(
                        f"{self.project}.{self.dataset}.{self.table}"
                    )
                    result["tableMetadata"] = {
                        "tableType": table.table_type,
                        "friendlyName": table.friendly_name,
                        "schema": {
                            "fields": [field.to_api_repr() for field in table.schema]
                        },
                    }

            self.dry_run_duration = time.time() - start_time
            return result

        except Exception as e:
            print(f"{self.sqlfile!s:59} ERROR\n", e)
            return None