in jsonschema-kafkaconnect-converter/src/main/java/com/amazonaws/services/schemaregistry/kafkaconnect/jsonschema/JsonSchemaConverter.java [158:207]
public SchemaAndValue toConnectData(String topic,
byte[] value) {
Object deserialized;
try {
deserialized = deserializer.deserialize(topic, value);
} catch (SerializationException | AWSSchemaRegistryException e) {
throw new DataException("Converting byte[] to Kafka Connect data failed due to serialization error: ", e);
}
if (deserialized == null) {
return SchemaAndValue.NULL;
}
String jsonSchemaString = deserializer.getGlueSchemaRegistryDeserializationFacade()
.getSchemaDefinition(value);
org.everit.json.schema.Schema jsonSchema = null;
try {
JSONObject jsonSchemaObject = new JSONObject(jsonSchemaString);
jsonSchema = SchemaLoader.builder()
.schemaJson(jsonSchemaObject)
.build()
.load()
.build();
} catch (Exception e) {
throw new DataException("Failed to read JSON Schema : " + jsonSchemaString, e);
}
JsonNode jsonNode = null;
if (deserialized instanceof JsonDataWithSchema) {
JsonDataWithSchema envelope = (JsonDataWithSchema) deserialized;
String payload = envelope.getPayload();
try {
jsonNode = objectMapper.readTree(payload);
} catch (IOException e) {
throw new DataException("Failed to read JSON Payload : " + payload, e);
}
} else {
throw new DataException("JSON Deserialized data is not in envelope format.");
}
Schema connectSchema = jsonSchemaToConnectSchemaConverter.toConnectSchema(jsonSchema);
Object connectValue = jsonNodeToConnectValueConverter.toConnectValue(connectSchema, jsonNode);
SchemaAndValue schemaAndValue = new SchemaAndValue(connectSchema, connectValue);
return schemaAndValue;
}