private JsonNode convertToJson()

in flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/deserialize/DorisJsonDebeziumDeserializationSchema.java [65:191]


    private JsonNode convertToJson(Schema schema, Object value) throws DorisException {
        if (value == null) {
            // Any schema is valid and we don't have a default, so treat this as an optional schema
            if (schema == null) {
                return null;
            }
            if (schema.isOptional()) {
                return JSON_NODE_FACTORY.nullNode();
            }
            throw new DorisException(
                    "Conversion error: null value for field that is required and has no default value");
        }

        try {
            final Schema.Type schemaType;
            if (schema == null) {
                schemaType = ConnectSchema.schemaType(value.getClass());
                if (schemaType == null) {
                    throw new DorisException(
                            "Java class "
                                    + value.getClass()
                                    + " does not have corresponding schema type.");
                }
            } else {
                schemaType = schema.type();
            }
            switch (schemaType) {
                case INT8:
                    return JSON_NODE_FACTORY.numberNode((Byte) value);
                case INT16:
                    return JSON_NODE_FACTORY.numberNode((Short) value);
                case INT32:
                    return JSON_NODE_FACTORY.numberNode((Integer) value);
                case INT64:
                    return JSON_NODE_FACTORY.numberNode((Long) value);
                case FLOAT32:
                    return JSON_NODE_FACTORY.numberNode((Float) value);
                case FLOAT64:
                    return JSON_NODE_FACTORY.numberNode((Double) value);
                case BOOLEAN:
                    return JSON_NODE_FACTORY.booleanNode((Boolean) value);
                case STRING:
                    CharSequence charSeq = (CharSequence) value;
                    return JSON_NODE_FACTORY.textNode(charSeq.toString());
                case BYTES:
                    if (value instanceof byte[]) {
                        return JSON_NODE_FACTORY.binaryNode((byte[]) value);
                    } else if (value instanceof ByteBuffer) {
                        return JSON_NODE_FACTORY.binaryNode(((ByteBuffer) value).array());
                    } else if (value instanceof BigDecimal) {
                        return JSON_NODE_FACTORY.numberNode((BigDecimal) value);
                    } else {
                        throw new DorisException(
                                "Invalid type for bytes type: " + value.getClass());
                    }
                case ARRAY:
                    {
                        Collection<?> collection = (Collection<?>) value;
                        ArrayNode list = JSON_NODE_FACTORY.arrayNode();
                        for (Object elem : collection) {
                            Schema valueSchema = schema == null ? null : schema.valueSchema();
                            JsonNode fieldValue = convertToJson(valueSchema, elem);
                            list.add(fieldValue);
                        }
                        return list;
                    }
                case MAP:
                    {
                        Map<?, ?> map = (Map<?, ?>) value;
                        // If true, using string keys and JSON object; if false, using non-string
                        // keys and Array-encoding
                        boolean objectMode;
                        if (schema == null) {
                            objectMode = true;
                            for (Map.Entry<?, ?> entry : map.entrySet()) {
                                if (!(entry.getKey() instanceof String)) {
                                    objectMode = false;
                                    break;
                                }
                            }
                        } else {
                            objectMode = schema.keySchema().type() == Schema.Type.STRING;
                        }
                        ObjectNode obj = null;
                        ArrayNode list = null;
                        if (objectMode) {
                            obj = JSON_NODE_FACTORY.objectNode();
                        } else {
                            list = JSON_NODE_FACTORY.arrayNode();
                        }
                        for (Map.Entry<?, ?> entry : map.entrySet()) {
                            Schema keySchema = schema == null ? null : schema.keySchema();
                            Schema valueSchema = schema == null ? null : schema.valueSchema();
                            JsonNode mapKey = convertToJson(keySchema, entry.getKey());
                            JsonNode mapValue = convertToJson(valueSchema, entry.getValue());

                            if (objectMode) {
                                obj.set(mapKey.asText(), mapValue);
                            } else {
                                list.add(JSON_NODE_FACTORY.arrayNode().add(mapKey).add(mapValue));
                            }
                        }
                        return objectMode ? obj : list;
                    }
                case STRUCT:
                    {
                        Struct struct = (Struct) value;
                        if (!struct.schema().equals(schema)) {
                            throw new DorisException("Mismatching schema.");
                        }
                        ObjectNode obj = JSON_NODE_FACTORY.objectNode();
                        for (Field field : schema.fields()) {
                            obj.set(
                                    field.name(),
                                    convertToJson(
                                            field.schema(),
                                            struct.getWithoutDefault(field.name())));
                        }
                        return obj;
                    }
            }
            throw new DorisException("Couldn't convert " + value + " to JSON.");
        } catch (ClassCastException e) {
            String schemaTypeStr = (schema != null) ? schema.type().toString() : "unknown schema";
            throw new DorisException("Invalid type for " + schemaTypeStr + ": " + value.getClass());
        }
    }