in src/generate_query.py [0:0]
def create_cdc_table(raw_table_name, cdc_table_name, partition_details,
cluster_details):
"""Creates CDC table based on source RAW table schema.
Retrieves schema details from source table in RAW dataset and creates a
table in CDC dataset based on that schema if it does not exist.
Args:
raw_table_name: Full table name of raw table (dataset.table_name).
cdc_table_name: Full table name of cdc table (dataset.table_name).
partition_details: Partition details
cluster_details: Cluster details
Raises:
NotFound: Bigquery table not found.
"""
try:
client.get_table(cdc_table_name)
print(f'Table {cdc_table_name} already exists. Not creating it again.')
except NotFound:
# Let's create CDC table.
raw_table_schema = client.get_table(raw_table_name).schema
target_schema = [
field for field in raw_table_schema
if field.name not in _CDC_EXCLUDED_COLUMN_LIST
]
cdc_table = bigquery.Table(cdc_table_name, schema=target_schema)
# Add clustering and partitioning properties if specified.
if partition_details:
validate_partition_columns(partition_details, target_schema)
# Add relevant partitioning clause
if partition_details['partition_type'] == 'time':
time_partition_grain = partition_details['time_grain']
cdc_table.time_partitioning = bigquery.TimePartitioning(
field=partition_details['column'],
type_=_TIME_PARTITION_GRAIN_DICT[time_partition_grain])
else:
integer_range_bucket = partition_details['integer_range_bucket']
bucket_start = integer_range_bucket['start']
bucket_end = integer_range_bucket['end']
bucket_interval = integer_range_bucket['interval']
cdc_table.range_partitioning = bigquery.RangePartitioning(
field=partition_details['column'],
range_=bigquery.PartitionRange(start=bucket_start,
end=bucket_end,
interval=bucket_interval))
if cluster_details:
validate_cluster_columns(cluster_details, target_schema)
cdc_table.clustering_fields = cluster_details['columns']
client.create_table(cdc_table)
print(f'Created table {cdc_table_name}.')