in schema-converter/json-schema-converter/src/main/java/org/apache/rocketmq/schema/json/JsonSchemaData.java [865:1015]
public JsonNode fromConnectData(Schema schema, Object logicalValue) {
if (logicalValue == null) {
if (schema == null) {
// Any schema is valid and we don't have a default, so treat this as an optional schema
return null;
}
if (schema.getDefaultValue() != null) {
return fromConnectData(schema, schema.getDefaultValue());
}
if (schema.isOptional()) {
return JSON_NODE_FACTORY.nullNode();
}
return null;
}
Object value = logicalValue;
if (schema != null && schema.getName() != null) {
ConnectToJsonLogicalTypeConverter logicalConverter =
TO_JSON_LOGICAL_CONVERTERS.get(schema.getName());
if (logicalConverter != null) {
return logicalConverter.convert(schema, logicalValue, config);
}
}
try {
final FieldType schemaType;
if (schema == null) {
schemaType = Schema.schemaType(value.getClass());
if (schemaType == null) {
throw new ConnectException("Java class "
+ value.getClass()
+ " does not have corresponding schema type.");
}
} else {
schemaType = schema.getFieldType();
}
switch (schemaType) {
case INT8:
// Use shortValue to create a ShortNode, otherwise an IntNode will be created
return JSON_NODE_FACTORY.numberNode(((Byte) value).shortValue());
case INT16:
return JSON_NODE_FACTORY.numberNode((Short) value);
case INT32:
return JSON_NODE_FACTORY.numberNode((Integer) value);
case INT64:
return JSON_NODE_FACTORY.numberNode((Long) value);
case FLOAT32:
return JSON_NODE_FACTORY.numberNode((Float) value);
case FLOAT64:
return JSON_NODE_FACTORY.numberNode((Double) value);
case BOOLEAN:
return JSON_NODE_FACTORY.booleanNode((Boolean) value);
case STRING:
CharSequence charSeq = (CharSequence) value;
return JSON_NODE_FACTORY.textNode(charSeq.toString());
case BYTES:
if (value instanceof byte[]) {
return JSON_NODE_FACTORY.binaryNode((byte[]) value);
} else if (value instanceof ByteBuffer) {
return JSON_NODE_FACTORY.binaryNode(((ByteBuffer) value).array());
} else if (value instanceof BigDecimal) {
return JSON_NODE_FACTORY.numberNode((BigDecimal) value);
} else {
throw new ConnectException("Invalid type for bytes type: " + value.getClass());
}
case ARRAY: {
Collection collection = (Collection) value;
ArrayNode list = JSON_NODE_FACTORY.arrayNode();
for (Object elem : collection) {
Schema valueSchema = schema == null ? null : schema.getValueSchema();
JsonNode fieldValue = fromConnectData(valueSchema, elem);
list.add(fieldValue);
}
return list;
}
case MAP: {
Map<?, ?> map = (Map<?, ?>) value;
// If true, using string keys and JSON object; if false, using non-string keys and
// Array-encoding
boolean objectMode;
if (schema == null) {
objectMode = true;
for (Map.Entry<?, ?> entry : map.entrySet()) {
if (!(entry.getKey() instanceof String)) {
objectMode = false;
break;
}
}
} else {
objectMode = schema.getKeySchema().getFieldType() == FieldType.STRING && !schema.getKeySchema()
.isOptional();
}
ObjectNode obj = null;
ArrayNode list = null;
if (objectMode) {
obj = JSON_NODE_FACTORY.objectNode();
} else {
list = JSON_NODE_FACTORY.arrayNode();
}
for (Map.Entry<?, ?> entry : map.entrySet()) {
Schema keySchema = schema == null ? null : schema.getKeySchema();
Schema valueSchema = schema == null ? null : schema.getValueSchema();
JsonNode mapKey = fromConnectData(keySchema, entry.getKey());
JsonNode mapValue = fromConnectData(valueSchema, entry.getValue());
if (objectMode) {
obj.set(mapKey.asText(), mapValue);
} else {
ObjectNode o = JSON_NODE_FACTORY.objectNode();
o.set(KEY_FIELD, mapKey);
o.set(VALUE_FIELD, mapValue);
list.add(o);
}
}
return objectMode ? obj : list;
}
case STRUCT: {
Struct struct = (Struct) value;
if (!struct.schema().equals(schema)) {
throw new ConnectException("Mismatching schema.");
}
//This handles the inverting of a union which is held as a struct, where each field is
// one of the union types.
if (JSON_TYPE_ONE_OF.equals(schema.getName())) {
for (Field field : schema.getFields()) {
Object object = struct.get(field);
if (object != null) {
return fromConnectData(field.getSchema(), object);
}
}
return fromConnectData(schema, null);
} else {
ObjectNode obj = JSON_NODE_FACTORY.objectNode();
for (Field field : schema.getFields()) {
JsonNode jsonNode = fromConnectData(field.getSchema(), struct.get(field));
if (jsonNode != null) {
obj.set(field.getName(), jsonNode);
}
}
return obj;
}
}
default:
break;
}
throw new ConnectException("Couldn't convert " + value + " to JSON.");
} catch (ClassCastException e) {
String schemaTypeStr = (schema != null) ? schema.getFieldType().toString() : "unknown schema";
throw new ConnectException("Invalid type for " + schemaTypeStr + ": " + value.getClass());
}
}