in jsonschema-kafkaconnect-converter/src/main/java/com/amazonaws/services/schemaregistry/kafkaconnect/jsonschema/ConnectValueToJsonNodeConverter.java [54:100]
public JsonNode convertToJson(Schema schema,
Object value) {
if (value == null) {
// Any schema is valid and we don't have a default, so treat this as an optional schema
if (schema == null) {
return JSON_NODE_FACTORY.nullNode();
}
if (schema.defaultValue() != null) {
return convertToJson(schema, schema.defaultValue());
}
if (schema.isOptional()) {
return JSON_NODE_FACTORY.nullNode();
}
throw new DataException("Conversion error: null value for field that is required and has no default value");
}
if (schema != null && schema.name() != null) {
TypeConverter logicalConverter = typeConverterFactory.get(schema.name());
if (logicalConverter != null) {
return logicalConverter.toJson(schema, value, jsonSchemaDataConfig);
}
}
final Schema.Type schemaType;
try {
if (schema == null) {
schemaType = ConnectSchema.schemaType(value.getClass());
if (schemaType == null) {
throw new DataException(
"Java class " + value.getClass() + " does not have corresponding schema type.");
}
} else {
schemaType = schema.type();
}
TypeConverter typeConverter = typeConverterFactory.get(schemaType);
if (typeConverter != null) {
return typeConverter.toJson(schema, value, jsonSchemaDataConfig);
} else {
throw new DataException("Couldn't convert " + value + " to JSON.");
}
} catch (ClassCastException e) {
String schemaTypeStr = (schema != null) ? schema.type()
.toString() : "unknown schema";
throw new DataException("Invalid type for " + schemaTypeStr + ": " + value.getClass());
}
}