in flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/deserialize/DorisJsonDebeziumDeserializationSchema.java [65:191]
private JsonNode convertToJson(Schema schema, Object value) throws DorisException {
if (value == null) {
// Any schema is valid and we don't have a default, so treat this as an optional schema
if (schema == null) {
return null;
}
if (schema.isOptional()) {
return JSON_NODE_FACTORY.nullNode();
}
throw new DorisException(
"Conversion error: null value for field that is required and has no default value");
}
try {
final Schema.Type schemaType;
if (schema == null) {
schemaType = ConnectSchema.schemaType(value.getClass());
if (schemaType == null) {
throw new DorisException(
"Java class "
+ value.getClass()
+ " does not have corresponding schema type.");
}
} else {
schemaType = schema.type();
}
switch (schemaType) {
case INT8:
return JSON_NODE_FACTORY.numberNode((Byte) value);
case INT16:
return JSON_NODE_FACTORY.numberNode((Short) value);
case INT32:
return JSON_NODE_FACTORY.numberNode((Integer) value);
case INT64:
return JSON_NODE_FACTORY.numberNode((Long) value);
case FLOAT32:
return JSON_NODE_FACTORY.numberNode((Float) value);
case FLOAT64:
return JSON_NODE_FACTORY.numberNode((Double) value);
case BOOLEAN:
return JSON_NODE_FACTORY.booleanNode((Boolean) value);
case STRING:
CharSequence charSeq = (CharSequence) value;
return JSON_NODE_FACTORY.textNode(charSeq.toString());
case BYTES:
if (value instanceof byte[]) {
return JSON_NODE_FACTORY.binaryNode((byte[]) value);
} else if (value instanceof ByteBuffer) {
return JSON_NODE_FACTORY.binaryNode(((ByteBuffer) value).array());
} else if (value instanceof BigDecimal) {
return JSON_NODE_FACTORY.numberNode((BigDecimal) value);
} else {
throw new DorisException(
"Invalid type for bytes type: " + value.getClass());
}
case ARRAY:
{
Collection<?> collection = (Collection<?>) value;
ArrayNode list = JSON_NODE_FACTORY.arrayNode();
for (Object elem : collection) {
Schema valueSchema = schema == null ? null : schema.valueSchema();
JsonNode fieldValue = convertToJson(valueSchema, elem);
list.add(fieldValue);
}
return list;
}
case MAP:
{
Map<?, ?> map = (Map<?, ?>) value;
// If true, using string keys and JSON object; if false, using non-string
// keys and Array-encoding
boolean objectMode;
if (schema == null) {
objectMode = true;
for (Map.Entry<?, ?> entry : map.entrySet()) {
if (!(entry.getKey() instanceof String)) {
objectMode = false;
break;
}
}
} else {
objectMode = schema.keySchema().type() == Schema.Type.STRING;
}
ObjectNode obj = null;
ArrayNode list = null;
if (objectMode) {
obj = JSON_NODE_FACTORY.objectNode();
} else {
list = JSON_NODE_FACTORY.arrayNode();
}
for (Map.Entry<?, ?> entry : map.entrySet()) {
Schema keySchema = schema == null ? null : schema.keySchema();
Schema valueSchema = schema == null ? null : schema.valueSchema();
JsonNode mapKey = convertToJson(keySchema, entry.getKey());
JsonNode mapValue = convertToJson(valueSchema, entry.getValue());
if (objectMode) {
obj.set(mapKey.asText(), mapValue);
} else {
list.add(JSON_NODE_FACTORY.arrayNode().add(mapKey).add(mapValue));
}
}
return objectMode ? obj : list;
}
case STRUCT:
{
Struct struct = (Struct) value;
if (!struct.schema().equals(schema)) {
throw new DorisException("Mismatching schema.");
}
ObjectNode obj = JSON_NODE_FACTORY.objectNode();
for (Field field : schema.fields()) {
obj.set(
field.name(),
convertToJson(
field.schema(),
struct.getWithoutDefault(field.name())));
}
return obj;
}
}
throw new DorisException("Couldn't convert " + value + " to JSON.");
} catch (ClassCastException e) {
String schemaTypeStr = (schema != null) ? schema.type().toString() : "unknown schema";
throw new DorisException("Invalid type for " + schemaTypeStr + ": " + value.getClass());
}
}