in 5-app-infra/3-artifact-publish/docker/cdmc/tag_engine_api/BigQueryUtils.py [0:0]
def create_history_table(self, dataset_id, table_name, fields):
schema = [bigquery.SchemaField('event_time', 'TIMESTAMP', mode='REQUIRED'),
bigquery.SchemaField('asset_name', 'STRING', mode='REQUIRED'),
bigquery.SchemaField('tag_creator_account', 'STRING', mode='REQUIRED'),
bigquery.SchemaField('tag_invoker_account', 'STRING', mode='REQUIRED'),
bigquery.SchemaField('job_uuid', 'STRING', mode='REQUIRED')]
for field in fields:
col_name = field['field_id']
if field['field_type'] == 'string':
col_type = 'STRING'
if field['field_type'] == 'enum':
col_type = 'STRING'
if field['field_type'] == 'double':
col_type = 'NUMERIC'
if field['field_type'] == 'bool':
col_type = 'BOOLEAN'
if field['field_type'] == 'timestamp':
col_type = 'TIMESTAMP'
if field['field_type'] == 'datetime':
col_type = 'TIMESTAMP' # datetime fields should be mapped to timestamps in BQ because they actually contain a timezone
if field['field_type'] == 'richtext':
col_type = 'STRING'
schema.append(bigquery.SchemaField(col_name, col_type, mode='NULLABLE')) # mode is always set to NULLABLE to be able to represent deleted tags
table_id = dataset_id.table(table_name)
table = bigquery.Table(table_id, schema=schema)
table.time_partitioning = bigquery.TimePartitioning(type_=bigquery.TimePartitioningType.DAY, field="event_time")
table = self.client.create_table(table, exists_ok=True)
print("Created table {}.{}.{}".format(table.project, table.dataset_id, table.table_id))
table_id = ("{}.{}.{}".format(table.project, table.dataset_id, table.table_id))
return table_id