in client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/AvroDeserializer.java [70:108]
public T deserialize(String subject, byte[] payload, Schema readerSchema)
throws SerializationException {
if (schemaRegistry == null) {
throw new SerializationException("please initialize the schema registry client first");
}
if (payload == null) {
return null;
}
try {
ByteArrayInputStream bais = new ByteArrayInputStream(payload);
BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(bais, null);
GetSchemaResponse response;
if (useTargetVersionSchema) {
response = AvroSerdeConfig.SCHEMA_TARGET_VERSION_DEFAULT == schemaTargetVersion
? schemaRegistry.getSchemaBySubject(subject)
: schemaRegistry.getSchemaBySubjectAndVersion(subject, schemaTargetVersion);
} else {
ByteBuffer buffer = ByteBuffer.allocate(16);
decoder.readBytes(buffer);
long schemaRecordId = buffer.getLong();
response = schemaRegistry.getSchemaByRecordId(subject, schemaRecordId);
}
Schema writerSchema = new Schema.Parser().parse(response.getIdl());
if (readerSchema == null) {
readerSchema = getReaderSchema(writerSchema);
}
DatumReader<T> datumReader = getDatumReader(writerSchema, readerSchema);
return datumReader.read(null, decoder);
} catch (RestClientException e) {
log.warn("get schema by record id failed, maybe the schema storage service not available now", e);
throw new SerializationException("get schema by record id failed, maybe the schema storage service not available now", e);
} catch (IOException e) {
log.warn("deserialize failed", e);
throw new SerializationException("deserialize error", e);
}
}