in rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/converter/record/json/JsonConverter.java [506:620]
private Object convertToJson(Schema schema, Object value) {
if (value == null) {
if (schema == null) {
return null;
}
if (schema.getDefaultValue() != null) {
return convertToJson(schema, schema.getDefaultValue());
}
if (schema.isOptional()) {
return null;
}
throw new ConnectException("Conversion error: null value for field that is required and has no default value");
}
if (schema != null && schema.getName() != null) {
LogicalTypeConverter logicalConverter = LOGICAL_CONVERTERS.get(schema.getName());
if (logicalConverter != null) {
if (value == null) {
return null;
} else {
return logicalConverter.toJson(schema, value, converterConfig);
}
}
}
try {
final FieldType schemaType;
if (schema == null) {
schemaType = Schema.schemaType(value.getClass());
if (schemaType == null) {
throw new ConnectException("Java class " + value.getClass() + " does not have corresponding schema type.");
}
} else {
schemaType = schema.getFieldType();
}
switch (schemaType) {
case INT8:
case INT16:
case INT32:
case INT64:
case FLOAT32:
case FLOAT64:
case BOOLEAN:
case STRING:
return value;
case BYTES:
if (value instanceof byte[]) {
return (byte[]) value;
} else if (value instanceof ByteBuffer) {
return ((ByteBuffer) value).array();
} else {
throw new ConnectException("Invalid type for bytes type: " + value.getClass());
}
case ARRAY: {
Collection collection = (Collection) value;
List list = new ArrayList();
for (Object elem : collection) {
Schema valueSchema = schema == null ? null : schema.getValueSchema();
Object fieldValue = convertToJson(valueSchema, elem);
list.add(fieldValue);
}
return list;
}
case MAP: {
Map<?, ?> map = (Map<?, ?>) value;
boolean objectMode;
if (schema == null) {
objectMode = true;
for (Map.Entry<?, ?> entry : map.entrySet()) {
if (!(entry.getKey() instanceof String)) {
objectMode = false;
break;
}
}
} else {
objectMode = schema.getKeySchema().getFieldType() == FieldType.STRING;
}
JSONArray resultArray = new JSONArray();
Map<String, Object> resultMap = new HashMap<>();
for (Map.Entry<?, ?> entry : map.entrySet()) {
Schema keySchema = schema == null ? null : schema.getKeySchema();
Schema valueSchema = schema == null ? null : schema.getValueSchema();
Object mapKey = convertToJson(keySchema, entry.getKey());
Object mapValue = convertToJson(valueSchema, entry.getValue());
if (objectMode) {
resultMap.put((String) mapKey, mapValue);
} else {
JSONArray entryArray = new JSONArray();
entryArray.add(0, mapKey);
entryArray.add(1, mapValue);
resultArray.add(entryArray);
}
}
return objectMode ? resultMap : resultArray;
}
case STRUCT: {
Struct struct = (Struct) value;
if (!struct.schema().equals(schema)) {
throw new ConnectException("Mismatching schema.");
}
JSONObject obj = new JSONObject(new LinkedHashMap());
for (Field field : struct.schema().getFields()) {
obj.put(field.getName(), convertToJson(field.getSchema(), struct.get(field)));
}
return obj;
}
}
throw new ConnectException("Couldn't convert " + value + " to JSON.");
} catch (ClassCastException e) {
String schemaTypeStr = (schema != null) ? schema.getFieldType().toString() : "unknown schema";
throw new ConnectException("Invalid type for " + schemaTypeStr + ": " + value.getClass());
}
}