in java/json/src/main/java/com/microsoft/azure/schemaregistry/kafka/json/KafkaJsonDeserializer.java [94:128]
public T deserialize(String topic, Headers headers, byte[] data) {
T dataObject;
String schemaId;
try {
ObjectMapper mapper = new ObjectMapper().configure(
DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
mapper.setVisibility(mapper.getVisibilityChecker().withFieldVisibility(JsonAutoDetect.Visibility.ANY));
dataObject = (T) mapper.readValue(data, this.config.getJsonSpecificType());
if (headers.lastHeader("schemaId") != null) {
schemaId = new String(headers.lastHeader("schemaId").value());
} else {
throw new JsonSerializationException("Schema Id was not found in record headers", null);
}
SchemaRegistrySchema schema = this.client.getSchema(schemaId);
JsonSchemaFactory factory = JsonSchemaFactory.getInstance(SpecVersion.VersionFlag.V202012);
JsonSchema jSchema = factory.getSchema(schema.getDefinition());
JsonNode node = mapper.readTree(data);
Set<ValidationMessage> errors = jSchema.validate(node);
if (errors.size() == 0) {
return dataObject;
} else {
throw new JsonSerializationException(
"Failed to validate Json data. Validation errors:\n" + Arrays.toString(errors.toArray()), null);
}
} catch (JsonSerializationException e) {
throw e;
} catch (Exception e) {
throw new JsonSerializationException("Execption occured during deserialization", e);
}
}