in transcoder/output/google_cloud/PubSubOutputManager.py [0:0]
def _add_schema(self, schema: DatacastSchema): # pylint: disable=too-many-locals
schema_id = schema.name
topic_id = schema.name
topic_path = self.publisher.topic_path(self.project_id, topic_id)
schema_path = self.schema_client.schema_path(self.project_id, schema.name)
_fields = self._get_field_list(schema.fields)
avsc_schema = {'type': 'record', 'namespace': 'sbeMessage', 'name': schema.name, 'fields': _fields}
self.avro_schemas[schema.name] = avsc_schema
if self.create_schema_enforcing_topics is True:
jsoned_avsc_schema = json.dumps(avsc_schema)
create_schema = False
if self._does_topic_schema_exist(schema_path) is True:
result = self._get_schema_avro(schema_path)
if jsoned_avsc_schema != result.definition:
raise PubSubTopicSchemaOutOfSyncError(
f'The schema "{schema_path}" differs from the definition for schema "{schema.name}"\nGenerated: {jsoned_avsc_schema}\nExisting: {result.definition}')
else:
create_schema = True
if create_schema is True:
topic_schema = Schema(name=schema_path, type_=Schema.Type.AVRO, definition=jsoned_avsc_schema)
try:
result = self.schema_client.create_schema(
request={"parent": self.project_path, "schema": topic_schema, "schema_id": schema_id}
)
logging.debug('Created a schema using an Avro schema:\n%s', result)
except AlreadyExists:
logging.debug('Schema %s already exists.', schema_id)
_existing_topic = self._get_topic(topic_path)
if _existing_topic is not None:
self._check_existing_label(_existing_topic)
expected_encoding = Encoding.BINARY if self.is_binary_encoded is True else Encoding.JSON
schema_settings = _existing_topic.schema_settings
if expected_encoding != schema_settings.encoding:
raise PubSubTopicSchemaOutOfSyncError(f'The topic "{_existing_topic.name}" has an encoding that '
f'differs from the '
f'runtime setting of {str(expected_encoding)}')
if schema_path != schema_settings.schema:
raise PubSubTopicSchemaOutOfSyncError(f'The topic "{_existing_topic.name}" has an unexpected schema '
f'path of "{schema_settings.schema}"')
else:
request_dict = {
"name": topic_path,
"labels": GOOGLE_PACKAGED_SOLUTION_LABEL_DICT
}
if self.create_schema_enforcing_topics is True:
request_dict["schema_settings"] = {
"schema": schema_path,
"encoding": Encoding.BINARY if self.is_binary_encoded is True else Encoding.JSON}
try:
response = self.publisher.create_topic(request=request_dict)
logging.debug('Created a topic:\n%s', response)
except AlreadyExists:
logging.debug('Topic %s already exists.', topic_id)
except InvalidArgument as ex:
logging.error(ex)
raise