in evalbench/databases/bigquery.py [0:0]
def _run_execute(query: str, eval_query: Optional[str] = None, rollback=False):
result: List = []
eval_result: List = []
error = None
query_replaced = query.replace("{{dataset}}", self.db_name)
if eval_query is not None:
eval_query_replaced = eval_query.replace("{{dataset}}", self.db_name)
try:
if rollback:
try:
initial_query = "SELECT 1;"
job_config = QueryJobConfig(create_session=True)
init_job = self.client.query(initial_query, job_config=job_config)
init_job.result()
session_id = init_job.session_info.session_id
conn_props = [ConnectionProperty(key="session_id", value=session_id)]
self.client.query(
"BEGIN TRANSACTION;",
job_config=QueryJobConfig(connection_properties=conn_props)
).result()
result = self._execute_queries(query_replaced, job_config=QueryJobConfig(connection_properties=conn_props))
if eval_query:
eval_result = self._execute_queries(eval_query_replaced, job_config=QueryJobConfig(connection_properties=conn_props))
self.client.query(
"ROLLBACK TRANSACTION;",
job_config=QueryJobConfig(connection_properties=conn_props)
).result()
except Exception as e:
error = str(e)
print(f"Error: {error}")
finally:
if 'session_id' in locals():
self.client.query(
"CALL BQ.ABORT_SESSION();",
job_config=QueryJobConfig(connection_properties=conn_props)
).result()
if not rollback:
result = self._execute_queries(query_replaced)
if eval_query and not rollback:
eval_result = self._execute_queries(eval_query_replaced)
except (GoogleAPICallError, Exception) as e:
error = str(e)
if "resources exceeded" in error:
raise ResourceExhaustedError(f"BigQuery resources exhausted: {e}") from e
elif "quota exceeded" in error:
raise ResourceExhaustedError(f"BigQuery quota exceeded: {e}") from e
else:
print(error)
return result, eval_result, error