public JsonNode fromConnectData()

in schema-converter/json-schema-converter/src/main/java/org/apache/rocketmq/schema/json/JsonSchemaData.java [865:1015]


    public JsonNode fromConnectData(Schema schema, Object logicalValue) {
        if (logicalValue == null) {
            if (schema == null) {
                // Any schema is valid and we don't have a default, so treat this as an optional schema
                return null;
            }
            if (schema.getDefaultValue() != null) {
                return fromConnectData(schema, schema.getDefaultValue());
            }
            if (schema.isOptional()) {
                return JSON_NODE_FACTORY.nullNode();
            }
            return null;
        }

        Object value = logicalValue;
        if (schema != null && schema.getName() != null) {
            ConnectToJsonLogicalTypeConverter logicalConverter =
                    TO_JSON_LOGICAL_CONVERTERS.get(schema.getName());
            if (logicalConverter != null) {
                return logicalConverter.convert(schema, logicalValue, config);
            }
        }

        try {
            final FieldType schemaType;
            if (schema == null) {
                schemaType = Schema.schemaType(value.getClass());
                if (schemaType == null) {
                    throw new ConnectException("Java class "
                            + value.getClass()
                            + " does not have corresponding schema type.");
                }
            } else {
                schemaType = schema.getFieldType();
            }
            switch (schemaType) {
                case INT8:
                    // Use shortValue to create a ShortNode, otherwise an IntNode will be created
                    return JSON_NODE_FACTORY.numberNode(((Byte) value).shortValue());
                case INT16:
                    return JSON_NODE_FACTORY.numberNode((Short) value);
                case INT32:
                    return JSON_NODE_FACTORY.numberNode((Integer) value);
                case INT64:
                    return JSON_NODE_FACTORY.numberNode((Long) value);
                case FLOAT32:
                    return JSON_NODE_FACTORY.numberNode((Float) value);
                case FLOAT64:
                    return JSON_NODE_FACTORY.numberNode((Double) value);
                case BOOLEAN:
                    return JSON_NODE_FACTORY.booleanNode((Boolean) value);
                case STRING:
                    CharSequence charSeq = (CharSequence) value;
                    return JSON_NODE_FACTORY.textNode(charSeq.toString());
                case BYTES:
                    if (value instanceof byte[]) {
                        return JSON_NODE_FACTORY.binaryNode((byte[]) value);
                    } else if (value instanceof ByteBuffer) {
                        return JSON_NODE_FACTORY.binaryNode(((ByteBuffer) value).array());
                    } else if (value instanceof BigDecimal) {
                        return JSON_NODE_FACTORY.numberNode((BigDecimal) value);
                    } else {
                        throw new ConnectException("Invalid type for bytes type: " + value.getClass());
                    }
                case ARRAY: {
                    Collection collection = (Collection) value;
                    ArrayNode list = JSON_NODE_FACTORY.arrayNode();
                    for (Object elem : collection) {
                        Schema valueSchema = schema == null ? null : schema.getValueSchema();
                        JsonNode fieldValue = fromConnectData(valueSchema, elem);
                        list.add(fieldValue);
                    }
                    return list;
                }
                case MAP: {
                    Map<?, ?> map = (Map<?, ?>) value;
                    // If true, using string keys and JSON object; if false, using non-string keys and
                    // Array-encoding
                    boolean objectMode;
                    if (schema == null) {
                        objectMode = true;
                        for (Map.Entry<?, ?> entry : map.entrySet()) {
                            if (!(entry.getKey() instanceof String)) {
                                objectMode = false;
                                break;
                            }
                        }
                    } else {
                        objectMode = schema.getKeySchema().getFieldType() == FieldType.STRING && !schema.getKeySchema()
                                .isOptional();
                    }
                    ObjectNode obj = null;
                    ArrayNode list = null;
                    if (objectMode) {
                        obj = JSON_NODE_FACTORY.objectNode();
                    } else {
                        list = JSON_NODE_FACTORY.arrayNode();
                    }
                    for (Map.Entry<?, ?> entry : map.entrySet()) {
                        Schema keySchema = schema == null ? null : schema.getKeySchema();
                        Schema valueSchema = schema == null ? null : schema.getValueSchema();
                        JsonNode mapKey = fromConnectData(keySchema, entry.getKey());
                        JsonNode mapValue = fromConnectData(valueSchema, entry.getValue());
                        if (objectMode) {
                            obj.set(mapKey.asText(), mapValue);
                        } else {
                            ObjectNode o = JSON_NODE_FACTORY.objectNode();
                            o.set(KEY_FIELD, mapKey);
                            o.set(VALUE_FIELD, mapValue);
                            list.add(o);
                        }
                    }
                    return objectMode ? obj : list;
                }
                case STRUCT: {
                    Struct struct = (Struct) value;
                    if (!struct.schema().equals(schema)) {
                        throw new ConnectException("Mismatching schema.");
                    }
                    //This handles the inverting of a union which is held as a struct, where each field is
                    // one of the union types.
                    if (JSON_TYPE_ONE_OF.equals(schema.getName())) {
                        for (Field field : schema.getFields()) {
                            Object object = struct.get(field);
                            if (object != null) {
                                return fromConnectData(field.getSchema(), object);
                            }
                        }
                        return fromConnectData(schema, null);
                    } else {
                        ObjectNode obj = JSON_NODE_FACTORY.objectNode();
                        for (Field field : schema.getFields()) {
                            JsonNode jsonNode = fromConnectData(field.getSchema(), struct.get(field));
                            if (jsonNode != null) {
                                obj.set(field.getName(), jsonNode);
                            }
                        }
                        return obj;
                    }
                }
                default:
                    break;
            }

            throw new ConnectException("Couldn't convert " + value + " to JSON.");
        } catch (ClassCastException e) {
            String schemaTypeStr = (schema != null) ? schema.getFieldType().toString() : "unknown schema";
            throw new ConnectException("Invalid type for " + schemaTypeStr + ": " + value.getClass());
        }
    }