in pulsar/schema/schema_avro.py [0:0]
def _get_writer_schema(self, topic: str, version: int) -> 'dict':
if self._writer_schemas.get(topic) is None:
self._writer_schemas[topic] = dict()
writer_schema = self._writer_schemas[topic].get(version)
if writer_schema is not None:
return writer_schema
if self._client is None:
return self._schema
self._logger.info('Downloading schema of %s version %d...', topic, version)
info = self._client.get_schema_info(topic, version)
self._logger.info('Downloaded schema of %s version %d', topic, version)
if info.schema_type() != _pulsar.SchemaType.AVRO:
raise RuntimeError(f'The schema type of topic "{topic}" and version {version}'
f' is {info.schema_type()}')
writer_schema = json.loads(info.schema())
self._writer_schemas[topic][version] = writer_schema
return writer_schema