in schema-converter/json-schema-converter/src/main/java/org/apache/rocketmq/schema/json/JsonSchemaData.java [326:389]
public static Object toConnectData(Schema schema, JsonNode jsonValue) {
final FieldType schemaType;
if (schema != null) {
schemaType = schema.getFieldType();
if (jsonValue == null || jsonValue.isNull()) {
if (schema.getDefaultValue() != null) {
// any logical type conversions should already have been applied
return schema.getDefaultValue();
}
if (jsonValue == null || schema.isOptional()) {
return null;
}
throw new ConnectException("Invalid null value for required " + schemaType + " field");
}
} else {
if (jsonValue == null) {
return null;
}
switch (jsonValue.getNodeType()) {
case NULL:
return null;
case BOOLEAN:
schemaType = FieldType.BOOLEAN;
break;
case NUMBER:
if (jsonValue.isIntegralNumber()) {
schemaType = FieldType.INT64;
} else {
schemaType = FieldType.FLOAT64;
}
break;
case ARRAY:
schemaType = FieldType.ARRAY;
break;
case OBJECT:
schemaType = FieldType.MAP;
break;
case STRING:
schemaType = FieldType.STRING;
break;
case BINARY:
case MISSING:
case POJO:
default:
schemaType = null;
break;
}
}
final JsonToConnectTypeConverter typeConverter = TO_CONNECT_CONVERTERS.get(schemaType);
if (typeConverter == null) {
throw new ConnectException("Unknown schema type: " + schemaType);
}
if (schema != null && schema.getName() != null) {
JsonToConnectLogicalTypeConverter logicalConverter =
TO_CONNECT_LOGICAL_CONVERTERS.get(schema.getName());
if (logicalConverter != null) {
return logicalConverter.convert(schema, jsonValue);
}
}
return typeConverter.convert(schema, jsonValue);
}