private Object convertToJson()

in rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/converter/record/json/JsonConverter.java [506:620]


    private Object convertToJson(Schema schema, Object value) {
        if (value == null) {
            if (schema == null) {
                return null;
            }
            if (schema.getDefaultValue() != null) {
                return convertToJson(schema, schema.getDefaultValue());
            }
            if (schema.isOptional()) {
                return null;
            }
            throw new ConnectException("Conversion error: null value for field that is required and has no default value");
        }

        if (schema != null && schema.getName() != null) {
            LogicalTypeConverter logicalConverter = LOGICAL_CONVERTERS.get(schema.getName());
            if (logicalConverter != null) {
                if (value == null) {
                    return null;
                } else {
                    return logicalConverter.toJson(schema, value, converterConfig);
                }
            }
        }

        try {
            final FieldType schemaType;
            if (schema == null) {
                schemaType = Schema.schemaType(value.getClass());
                if (schemaType == null) {
                    throw new ConnectException("Java class " + value.getClass() + " does not have corresponding schema type.");
                }
            } else {
                schemaType = schema.getFieldType();
            }

            switch (schemaType) {
                case INT8:
                case INT16:
                case INT32:
                case INT64:
                case FLOAT32:
                case FLOAT64:
                case BOOLEAN:
                case STRING:
                    return value;
                case BYTES:
                    if (value instanceof byte[]) {
                        return (byte[]) value;
                    } else if (value instanceof ByteBuffer) {
                        return ((ByteBuffer) value).array();
                    } else {
                        throw new ConnectException("Invalid type for bytes type: " + value.getClass());
                    }
                case ARRAY: {
                    Collection collection = (Collection) value;
                    List list = new ArrayList();
                    for (Object elem : collection) {
                        Schema valueSchema = schema == null ? null : schema.getValueSchema();
                        Object fieldValue = convertToJson(valueSchema, elem);
                        list.add(fieldValue);
                    }
                    return list;
                }
                case MAP: {
                    Map<?, ?> map = (Map<?, ?>) value;
                    boolean objectMode;
                    if (schema == null) {
                        objectMode = true;
                        for (Map.Entry<?, ?> entry : map.entrySet()) {
                            if (!(entry.getKey() instanceof String)) {
                                objectMode = false;
                                break;
                            }
                        }
                    } else {
                        objectMode = schema.getKeySchema().getFieldType() == FieldType.STRING;
                    }

                    JSONArray resultArray = new JSONArray();
                    Map<String, Object> resultMap = new HashMap<>();
                    for (Map.Entry<?, ?> entry : map.entrySet()) {
                        Schema keySchema = schema == null ? null : schema.getKeySchema();
                        Schema valueSchema = schema == null ? null : schema.getValueSchema();
                        Object mapKey = convertToJson(keySchema, entry.getKey());
                        Object mapValue = convertToJson(valueSchema, entry.getValue());
                        if (objectMode) {
                            resultMap.put((String) mapKey, mapValue);
                        } else {
                            JSONArray entryArray = new JSONArray();
                            entryArray.add(0, mapKey);
                            entryArray.add(1, mapValue);
                            resultArray.add(entryArray);
                        }
                    }
                    return objectMode ? resultMap : resultArray;
                }
                case STRUCT: {
                    Struct struct = (Struct) value;
                    if (!struct.schema().equals(schema)) {
                        throw new ConnectException("Mismatching schema.");
                    }
                    JSONObject obj = new JSONObject(new LinkedHashMap());
                    for (Field field : struct.schema().getFields()) {
                        obj.put(field.getName(), convertToJson(field.getSchema(), struct.get(field)));
                    }
                    return obj;
                }
            }
            throw new ConnectException("Couldn't convert " + value + " to JSON.");
        } catch (ClassCastException e) {
            String schemaTypeStr = (schema != null) ? schema.getFieldType().toString() : "unknown schema";
            throw new ConnectException("Invalid type for " + schemaTypeStr + ": " + value.getClass());
        }
    }