def _add_schema()

in transcoder/output/google_cloud/PubSubOutputManager.py [0:0]


    def _add_schema(self, schema: DatacastSchema):  # pylint: disable=too-many-locals
        schema_id = schema.name
        topic_id = schema.name

        topic_path = self.publisher.topic_path(self.project_id, topic_id)
        schema_path = self.schema_client.schema_path(self.project_id, schema.name)

        _fields = self._get_field_list(schema.fields)
        avsc_schema = {'type': 'record', 'namespace': 'sbeMessage', 'name': schema.name, 'fields': _fields}
        self.avro_schemas[schema.name] = avsc_schema

        if self.create_schema_enforcing_topics is True:
            jsoned_avsc_schema = json.dumps(avsc_schema)
            create_schema = False

            if self._does_topic_schema_exist(schema_path) is True:
                result = self._get_schema_avro(schema_path)
                if jsoned_avsc_schema != result.definition:
                    raise PubSubTopicSchemaOutOfSyncError(
                        f'The schema "{schema_path}" differs from the definition for schema "{schema.name}"\nGenerated: {jsoned_avsc_schema}\nExisting: {result.definition}')
            else:
                create_schema = True

            if create_schema is True:
                topic_schema = Schema(name=schema_path, type_=Schema.Type.AVRO, definition=jsoned_avsc_schema)
                try:
                    result = self.schema_client.create_schema(
                        request={"parent": self.project_path, "schema": topic_schema, "schema_id": schema_id}
                    )
                    logging.debug('Created a schema using an Avro schema:\n%s', result)
                except AlreadyExists:
                    logging.debug('Schema %s already exists.', schema_id)

        _existing_topic = self._get_topic(topic_path)
        if _existing_topic is not None:
            self._check_existing_label(_existing_topic)

            expected_encoding = Encoding.BINARY if self.is_binary_encoded is True else Encoding.JSON
            schema_settings = _existing_topic.schema_settings

            if expected_encoding != schema_settings.encoding:
                raise PubSubTopicSchemaOutOfSyncError(f'The topic "{_existing_topic.name}" has an encoding that '
                                                      f'differs from the '
                                                      f'runtime setting of {str(expected_encoding)}')

            if schema_path != schema_settings.schema:
                raise PubSubTopicSchemaOutOfSyncError(f'The topic "{_existing_topic.name}" has an unexpected schema '
                                                      f'path of "{schema_settings.schema}"')
        else:
            request_dict = {
                "name": topic_path,
                "labels": GOOGLE_PACKAGED_SOLUTION_LABEL_DICT
            }

            if self.create_schema_enforcing_topics is True:
                request_dict["schema_settings"] = {
                    "schema": schema_path,
                    "encoding": Encoding.BINARY if self.is_binary_encoded is True else Encoding.JSON}

            try:
                response = self.publisher.create_topic(request=request_dict)
                logging.debug('Created a topic:\n%s', response)
            except AlreadyExists:
                logging.debug('Topic %s already exists.', topic_id)
            except InvalidArgument as ex:
                logging.error(ex)
                raise