in bigquery_etl/dryrun.py [0:0]
def dry_run_result(self):
"""Dry run the provided SQL file."""
if self.content:
sql = self.content
else:
sql = self.get_sql()
query_parameters = []
scheduling_metadata = self.metadata.scheduling if self.metadata else {}
if date_partition_parameter := scheduling_metadata.get(
"date_partition_parameter", "submission_date"
):
query_parameters.append(
bigquery.ScalarQueryParameter(
date_partition_parameter,
"DATE",
QUERY_PARAMETER_TYPE_VALUES["DATE"],
)
)
for parameter in scheduling_metadata.get("parameters", []):
parameter_name, parameter_type, _ = parameter.strip().split(":", 2)
parameter_type = parameter_type.upper() or "STRING"
query_parameters.append(
bigquery.ScalarQueryParameter(
parameter_name,
parameter_type,
QUERY_PARAMETER_TYPE_VALUES.get(parameter_type),
)
)
project = basename(dirname(dirname(dirname(self.sqlfile))))
dataset = basename(dirname(dirname(self.sqlfile)))
try:
start_time = time.time()
if self.use_cloud_function:
json_data = {
"project": self.project or project,
"dataset": self.dataset or dataset,
"query": sql,
"query_parameters": [
query_parameter.to_api_repr()
for query_parameter in query_parameters
],
}
if self.table:
json_data["table"] = self.table
r = urlopen(
Request(
self.dry_run_url,
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {self.id_token}",
},
data=json.dumps(json_data).encode("utf8"),
method="POST",
)
)
result = json.load(r)
else:
self.client.project = project
job_config = bigquery.QueryJobConfig(
dry_run=True,
use_query_cache=False,
default_dataset=f"{project}.{dataset}",
query_parameters=query_parameters,
)
job = self.client.query(sql, job_config=job_config)
try:
dataset_labels = self.client.get_dataset(job.default_dataset).labels
except Exception as e:
# Most users do not have bigquery.datasets.get permission in
# moz-fx-data-shared-prod
# This should not prevent the dry run from running since the dataset
# labels are usually not required
if "Permission bigquery.datasets.get denied on dataset" in str(e):
dataset_labels = []
else:
raise e
result = {
"valid": True,
"referencedTables": [
ref.to_api_repr() for ref in job.referenced_tables
],
"schema": (
job._properties.get("statistics", {})
.get("query", {})
.get("schema", {})
),
"datasetLabels": dataset_labels,
}
if (
self.project is not None
and self.table is not None
and self.dataset is not None
):
table = self.client.get_table(
f"{self.project}.{self.dataset}.{self.table}"
)
result["tableMetadata"] = {
"tableType": table.table_type,
"friendlyName": table.friendly_name,
"schema": {
"fields": [field.to_api_repr() for field in table.schema]
},
}
self.dry_run_duration = time.time() - start_time
return result
except Exception as e:
print(f"{self.sqlfile!s:59} ERROR\n", e)
return None