in retail/recommendation-system/bqml-mlops/part_2/pipeline.py [0:0]
def run_bigquery_ddl(project_id: str, query_string: str, location: str) -> NamedTuple(
'DDLOutput', [('created_table', str), ('query', str)]):
"""
Runs BigQuery query and returns a table/model name
"""
print(query_string)
from google.cloud import bigquery
from google.api_core.future import polling
from google.cloud import bigquery
from google.cloud.bigquery import retry as bq_retry
bqclient = bigquery.Client(project=project_id, location=location)
job = bqclient.query(query_string, retry=bq_retry.DEFAULT_RETRY)
job._retry = polling.DEFAULT_RETRY
while job.running():
from time import sleep
sleep(0.1)
print('Running ...')
tblname = job.ddl_target_table
tblname = '{}.{}'.format(tblname.dataset_id, tblname.table_id)
print('{} created in {}'.format(tblname, job.ended - job.started))
from collections import namedtuple
result_tuple = namedtuple('DDLOutput', ['created_table', 'query'])
return result_tuple(tblname, query_string)