in rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/converter/record/json/JsonConverter.java [754:806]
private Object convertToConnect(Schema schema, Object value) {
final FieldType schemaType;
if (schema != null) {
schemaType = schema.getFieldType();
if (value == null) {
if (schema.getDefaultValue() != null) {
return schema.getDefaultValue(); // any logical type conversions should already have been applied
}
if (schema.isOptional()) {
return null;
}
throw new ConnectException("Invalid null value for required " + schemaType + " field");
}
} else {
if (value == null) {
return null;
} else if (value instanceof String) {
schemaType = FieldType.STRING;
} else if (value instanceof Integer) {
schemaType = FieldType.INT32;
} else if (value instanceof Long) {
schemaType = FieldType.INT64;
} else if (value instanceof Float) {
schemaType = FieldType.FLOAT32;
} else if (value instanceof Double) {
schemaType = FieldType.FLOAT64;
} else if (value instanceof BigDecimal) {
schemaType = FieldType.FLOAT64;
} else if (value instanceof Boolean) {
schemaType = FieldType.BOOLEAN;
} else if (value instanceof List) {
schemaType = FieldType.ARRAY;
} else if (value instanceof Map) {
schemaType = FieldType.MAP;
} else {
schemaType = null;
}
}
final JsonToConnectTypeConverter typeConverter = TO_CONNECT_CONVERTERS.get(schemaType);
if (typeConverter == null) {
throw new ConnectException("Unknown schema type: " + schemaType);
}
if (schema != null && schema.getName() != null) {
LogicalTypeConverter logicalConverter = LOGICAL_CONVERTERS.get(schema.getName());
if (logicalConverter != null) {
return logicalConverter.toConnect(schema, value);
}
}
return typeConverter.convert(schema, value);
}