private Object convertToConnect()

in rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/converter/record/json/JsonConverter.java [754:806]


    private Object convertToConnect(Schema schema, Object value) {
        final FieldType schemaType;
        if (schema != null) {
            schemaType = schema.getFieldType();
            if (value == null) {
                if (schema.getDefaultValue() != null) {
                    return schema.getDefaultValue(); // any logical type conversions should already have been applied
                }
                if (schema.isOptional()) {
                    return null;
                }
                throw new ConnectException("Invalid null value for required " + schemaType + " field");
            }
        } else {
            if (value == null) {
                return null;
            } else if (value instanceof String) {
                schemaType = FieldType.STRING;
            } else if (value instanceof Integer) {
                schemaType = FieldType.INT32;
            } else if (value instanceof Long) {
                schemaType = FieldType.INT64;
            } else if (value instanceof Float) {
                schemaType = FieldType.FLOAT32;
            } else if (value instanceof Double) {
                schemaType = FieldType.FLOAT64;
            } else if (value instanceof BigDecimal) {
                schemaType = FieldType.FLOAT64;
            } else if (value instanceof Boolean) {
                schemaType = FieldType.BOOLEAN;
            } else if (value instanceof List) {
                schemaType = FieldType.ARRAY;
            } else if (value instanceof Map) {
                schemaType = FieldType.MAP;
            } else {
                schemaType = null;
            }
        }

        final JsonToConnectTypeConverter typeConverter = TO_CONNECT_CONVERTERS.get(schemaType);
        if (typeConverter == null) {
            throw new ConnectException("Unknown schema type: " + schemaType);
        }

        if (schema != null && schema.getName() != null) {
            LogicalTypeConverter logicalConverter = LOGICAL_CONVERTERS.get(schema.getName());
            if (logicalConverter != null) {
                return logicalConverter.toConnect(schema, value);
            }
        }

        return typeConverter.convert(schema, value);
    }