in common/materializer/generate_assets.py [0:0]
def create_table(bq_client: bigquery.Client, full_table_name: str,
description: typing.Optional[str], sql_str: str,
table_setting: dict) -> None:
"""Creates empty BQ Reporting table."""
# Steps to create table:
# a. Use core_sql to create a "temporary" table, and use it to get schema.
# b. Add partition and cluster clauses.
# c. Create final table using above two.
# NOTE: The other option is to create the table directly using
# "CREATE TABLE" DDL, but applying partition and cluster clauses is
# more complex.
# Create a temp table using table create query to get schema.
# -------------------------------------------------------------
# NOTE: We can't create BQ temp table using `create_table` API call.
# Hence, creating a regular table as temp table.
temp_table_name = full_table_name + "_temp"
bq_client.delete_table(temp_table_name, not_found_ok=True)
logging.info("Creating temporary table '%s'", temp_table_name)
temp_table_sql = ("CREATE TABLE `" + temp_table_name + "` " +
"OPTIONS(expiration_timestamp=TIMESTAMP_ADD(" +
"CURRENT_TIMESTAMP(), INTERVAL 12 HOUR))" +
" AS (\n SELECT * FROM (\n" +
textwrap.indent(sql_str, " ") + "\n)\n" +
" WHERE FALSE\n)")
logging.debug("temporary table sql = '%s'", temp_table_sql)
create_temp_table_job = bq_client.query(temp_table_sql)
_ = create_temp_table_job.result()
logging.info("Temporary table created.")
table_schema = bq_client.get_table(temp_table_name).schema
logging.debug("Table schema = \n'%s'", table_schema)
# Create final actual table.
# -------------------------
logging.info("Creating actual table '%s'", full_table_name)
table = bigquery.Table(full_table_name, schema=table_schema)
# Add partition and cluster details.
partition_details = table_setting.get("partition_details")
if partition_details:
table = bq_materializer.add_partition_to_table_def(
table, partition_details)
cluster_details = table_setting.get("cluster_details")
if cluster_details:
table = bq_materializer.add_cluster_to_table_def(table, cluster_details)
# Add optional description
if description:
table.description = description
try:
_ = bq_client.create_table(table)
logging.info("Created table '%s'", full_table_name)
except Exception as e:
raise e
finally:
# Cleanup - remove temporary table
bq_client.delete_table(temp_table_name, not_found_ok=True)
try:
bq_client.get_table(temp_table_name)
logging.warning(
"⚠️ Couldn't delete temporary table `%s`."
"Please delete it manually. ⚠️", temp_table_name)
except NotFound:
logging.info("Deleted temp table = %s'", temp_table_name)