in schema-converter/json-schema-converter/src/main/java/org/apache/rocketmq/schema/json/JsonSchemaData.java [100:209]
static {
TO_CONNECT_CONVERTERS.put(FieldType.BOOLEAN, (schema, value) -> value.booleanValue());
TO_CONNECT_CONVERTERS.put(FieldType.INT8, (schema, value) -> (byte) value.shortValue());
TO_CONNECT_CONVERTERS.put(FieldType.INT16, (schema, value) -> value.shortValue());
TO_CONNECT_CONVERTERS.put(FieldType.INT32, (schema, value) -> value.intValue());
TO_CONNECT_CONVERTERS.put(FieldType.INT64, (schema, value) -> value.longValue());
TO_CONNECT_CONVERTERS.put(FieldType.FLOAT32, (schema, value) -> value.floatValue());
TO_CONNECT_CONVERTERS.put(FieldType.FLOAT64, (schema, value) -> value.doubleValue());
TO_CONNECT_CONVERTERS.put(FieldType.BYTES, (schema, value) -> {
try {
Object o = value.binaryValue();
if (o == null) {
o = value.decimalValue(); // decimal logical type
}
return o;
} catch (IOException e) {
throw new ConnectException("Invalid bytes field", e);
}
});
TO_CONNECT_CONVERTERS.put(FieldType.STRING, (schema, value) -> value.textValue());
TO_CONNECT_CONVERTERS.put(FieldType.ARRAY, (schema, value) -> {
Schema elemSchema = schema == null ? null : schema.getValueSchema();
ArrayList<Object> result = new ArrayList<>();
for (JsonNode elem : value) {
result.add(toConnectData(elemSchema, elem));
}
return result;
});
TO_CONNECT_CONVERTERS.put(FieldType.MAP, (schema, value) -> {
Schema keySchema = schema == null ? null : schema.getKeySchema();
Schema valueSchema = schema == null ? null : schema.getValueSchema();
Map<Object, Object> result = new HashMap<>();
if (schema == null || (keySchema.getFieldType() == FieldType.STRING && !keySchema.isOptional())) {
if (!value.isObject()) {
throw new ConnectException(
"Maps with string fields should be encoded as JSON objects, but found "
+ value.getNodeType());
}
Iterator<Map.Entry<String, JsonNode>> fieldIt = value.fields();
while (fieldIt.hasNext()) {
Map.Entry<String, JsonNode> entry = fieldIt.next();
result.put(entry.getKey(), toConnectData(valueSchema, entry.getValue()));
}
} else {
if (!value.isArray()) {
throw new ConnectException(
"Maps with non-string fields should be encoded as JSON array of objects, but "
+ "found "
+ value.getNodeType());
}
for (JsonNode entry : value) {
if (!entry.isObject()) {
throw new ConnectException("Found invalid map entry instead of object: "
+ entry.getNodeType());
}
if (entry.size() != 2) {
throw new ConnectException("Found invalid map entry, expected length 2 but found :" + entry
.size());
}
result.put(toConnectData(keySchema, entry.get(KEY_FIELD)),
toConnectData(valueSchema, entry.get(VALUE_FIELD))
);
}
}
return result;
});
TO_CONNECT_CONVERTERS.put(FieldType.STRUCT, (schema, value) -> {
if (schema.getName() != null && schema.getName().equals(JSON_TYPE_ONE_OF)) {
int numMatchingProperties = -1;
Field matchingField = null;
for (Field field : schema.getFields()) {
Schema fieldSchema = field.getSchema();
if (isSimpleSchema(fieldSchema, value)) {
return new Struct(schema).put(JSON_TYPE_ONE_OF + ".field." + field.getIndex(),
toConnectData(fieldSchema, value)
);
} else {
int matching = matchStructSchema(fieldSchema, value);
if (matching > numMatchingProperties) {
numMatchingProperties = matching;
matchingField = field;
}
}
}
if (matchingField != null) {
return new Struct(schema).put(
JSON_TYPE_ONE_OF + ".field." + matchingField.getIndex(),
toConnectData(matchingField.getSchema(), value)
);
}
throw new ConnectException("Did not find matching oneof field for data: " + value.toString());
} else {
if (!value.isObject()) {
throw new ConnectException("Structs should be encoded as JSON objects, but found "
+ value.getNodeType());
}
Struct result = new Struct(schema);
for (Field field : schema.getFields()) {
Object fieldValue = toConnectData(field.getSchema(), value.get(field.getName()));
if (fieldValue != null) {
result.put(field, fieldValue);
}
}
return result;
}
});
}