in transcoder/output/google_cloud/BigQueryOutputManager.py [0:0]
def _add_schema(self, schema: DatacastSchema):
bq_schema = self._get_field_list(schema.fields)
table_ref = bigquery.TableReference(self.dataset_ref, schema.name)
if self._does_table_exist(schema.name) is True:
existing_table = self.client.get_table(table_ref)
if GOOGLE_PACKAGED_SOLUTION_KEY not in existing_table.labels \
or existing_table.labels.get(GOOGLE_PACKAGED_SOLUTION_KEY, None) != GOOGLE_PACKAGED_SOLUTION_VALUE:
existing_table.labels.update(GOOGLE_PACKAGED_SOLUTION_LABEL_DICT)
try:
self.client.update_table(existing_table, ["labels"])
except Exception as err: # pylint: disable=broad-except
logging.warning("Failed to update table labels: %s", err)
if self._is_schema_equal(existing_table.schema, bq_schema) is False:
raise BigQueryTableSchemaOutOfSyncError(
f'The schema for table "{table_ref}" differs from the definition for schema "{schema.name}"')
else:
table = bigquery.Table(table_ref, schema=bq_schema)
table.labels = GOOGLE_PACKAGED_SOLUTION_LABEL_DICT
try:
self.client.create_table(table, exists_ok=True)
except Conflict as error:
# MS: 2022-10-12 Adding exists_ok to create_table call so that the Conflict is not raised
# b/153072942
# https://cloud.google.com/bigquery/docs/error-messages
logging.warning('Table conflict, already exists %s: %s', schema.name, error)
except Exception as error:
logging.error('Error creating table %s: %s', schema.name, error)
raise