in java/avro-converter/src/main/java/com/microsoft/azure/schemaregistry/kafka/connect/avro/AvroConverter.java [141:176]
public SchemaAndValue toConnectData(String topic, Headers headers, byte[] value) {
String contentTypeString = "";
String schemaId = "";
try {
MessageWithMetadata message = new MessageWithMetadata();
message.setBodyAsBinaryData(BinaryData.fromBytes(value));
Header contentTypeHeader = headers.lastHeader("content-type");
if (contentTypeHeader != null) {
contentTypeString = new String(contentTypeHeader.value());
message.setContentType(contentTypeString);
}
Object deserializedMessage = deserializer.deserializeMessageData(message,
TypeReference.createInstance(this.avroConverterConfig.getAvroSpecificType()));
String[] splitSchemaId = contentTypeString.split("\\+");
if (splitSchemaId.length < 2) {
throw new DataException("Failed to prase schema id " + splitSchemaId[0]);
}
schemaId = splitSchemaId[1];
SchemaRegistrySchema srSchema = schemaRegistryClient.getSchema(schemaId).block();
// Convert Avro object to Connect SchemaAndValue
AvroConverterUtils utils = new AvroConverterUtils();
return utils.toConnectData(new Parser().parse(srSchema.getDefinition()), deserializedMessage);
} catch (SchemaRegistryApacheAvroException e) {
throw new DataException("Failed to deserialize Avro data: ", e);
} catch (Exception e) {
throw e;
}
}