in generator/dryrun.py [0:0]
def dry_run_result(self):
"""Return the dry run result."""
try:
if self.use_cloud_function:
json_data = {
"query": self.sql or "SELECT 1",
"project": self.project,
"dataset": self.dataset or "telemetry",
}
if self.table:
json_data["table"] = self.table
r = urlopen(
Request(
self.dry_run_url,
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {self.id_token}",
},
data=json.dumps(json_data).encode("utf8"),
method="POST",
)
)
return json.load(r)
else:
query_schema = None
referenced_tables = []
table_metadata = None
if self.sql:
job_config = bigquery.QueryJobConfig(
dry_run=True,
use_query_cache=False,
query_parameters=[
bigquery.ScalarQueryParameter(
"submission_date", "DATE", "2019-01-01"
)
],
)
if self.project:
job_config.connection_properties = [
bigquery.ConnectionProperty(
"dataset_project_id", self.project
)
]
job = self.client.query(self.sql, job_config=job_config)
query_schema = (
job._properties.get("statistics", {})
.get("query", {})
.get("schema", {})
)
referenced_tables = [
ref.to_api_repr() for ref in job.referenced_tables
]
if (
self.project is not None
and self.table is not None
and self.dataset is not None
):
table = self.client.get_table(
f"{self.project}.{self.dataset}.{self.table}"
)
table_metadata = {
"tableType": table.table_type,
"friendlyName": table.friendly_name,
"schema": {
"fields": [field.to_api_repr() for field in table.schema]
},
}
return {
"valid": True,
"referencedTables": referenced_tables,
"schema": query_schema,
"tableMetadata": table_metadata,
}
except Exception as e:
print(f"ERROR {e}")
return None