def _get_writer_schema()

in pulsar/schema/schema_avro.py [0:0]


        def _get_writer_schema(self, topic: str, version: int) -> 'dict':
            if self._writer_schemas.get(topic) is None:
                self._writer_schemas[topic] = dict()
            writer_schema = self._writer_schemas[topic].get(version)
            if writer_schema is not None:
                return writer_schema
            if self._client is None:
                return self._schema

            self._logger.info('Downloading schema of %s version %d...', topic, version)
            info = self._client.get_schema_info(topic, version)
            self._logger.info('Downloaded schema of %s version %d', topic, version)
            if info.schema_type() != _pulsar.SchemaType.AVRO:
                raise RuntimeError(f'The schema type of topic "{topic}" and version {version}'
                                   f' is {info.schema_type()}')
            writer_schema = json.loads(info.schema())
            self._writer_schemas[topic][version] = writer_schema
            return writer_schema