in rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/converter/record/json/JsonConverter.java [399:500]
public JSONObject asJsonSchema(Schema schema) {
if (schema == null) {
return null;
}
// from cached
JSONObject cached = fromConnectSchemaCache.get(schema);
if (cached != null) {
return cached.clone();
}
JSONObject jsonSchema;
// convert field type name
switch (schema.getFieldType()) {
case BOOLEAN:
jsonSchema = JsonSchema.BOOLEAN_SCHEMA();
break;
case BYTES:
jsonSchema = JsonSchema.BYTES_SCHEMA();
break;
case FLOAT64:
jsonSchema = JsonSchema.DOUBLE_SCHEMA();
break;
case FLOAT32:
jsonSchema = JsonSchema.FLOAT_SCHEMA();
break;
case INT8:
jsonSchema = JsonSchema.INT8_SCHEMA();
break;
case INT16:
jsonSchema = JsonSchema.INT16_SCHEMA();
break;
case INT32:
jsonSchema = JsonSchema.INT32_SCHEMA();
break;
case INT64:
jsonSchema = JsonSchema.INT64_SCHEMA();
break;
case STRING:
jsonSchema = JsonSchema.STRING_SCHEMA();
break;
case ARRAY:
jsonSchema = new JSONObject();
jsonSchema.put(JsonSchema.SCHEMA_TYPE_FIELD_NAME, JsonSchema.ARRAY_TYPE_NAME);
jsonSchema.put(JsonSchema.ARRAY_ITEMS_FIELD_NAME, asJsonSchema(schema.getValueSchema()));
break;
case MAP:
jsonSchema = new JSONObject();
jsonSchema.put(JsonSchema.SCHEMA_TYPE_FIELD_NAME, JsonSchema.MAP_TYPE_NAME);
jsonSchema.put(JsonSchema.MAP_KEY_FIELD_NAME, asJsonSchema(schema.getKeySchema()));
jsonSchema.put(JsonSchema.MAP_VALUE_FIELD_NAME, asJsonSchema(schema.getValueSchema()));
break;
case STRUCT:
jsonSchema = new JSONObject(new ConcurrentHashMap<>());
jsonSchema.put(JsonSchema.SCHEMA_TYPE_FIELD_NAME, JsonSchema.STRUCT_TYPE_NAME);
// field list
JSONArray fields = new JSONArray();
for (Field field : schema.getFields()) {
String fieldName = field.getName();
JSONObject fieldJsonSchema = asJsonSchema(field.getSchema());
fieldJsonSchema.put(JsonSchema.STRUCT_FIELD_NAME_FIELD_NAME, fieldName);
fields.add(fieldJsonSchema);
}
jsonSchema.put(JsonSchema.STRUCT_FIELDS_FIELD_NAME, fields);
break;
default:
throw new ConnectException("Couldn't translate unsupported schema type " + schema + ".");
}
// optional
jsonSchema.put(JsonSchema.SCHEMA_OPTIONAL_FIELD_NAME, schema.isOptional());
// name
if (schema.getName() != null) {
jsonSchema.put(JsonSchema.SCHEMA_NAME_FIELD_NAME, schema.getName());
}
// version
if (schema.getVersion() != null) {
jsonSchema.put(JsonSchema.SCHEMA_VERSION_FIELD_NAME, schema.getVersion());
}
// doc
if (schema.getDoc() != null) {
jsonSchema.put(JsonSchema.SCHEMA_DOC_FIELD_NAME, schema.getDoc());
}
// parameters
if (schema.getParameters() != null) {
JSONObject jsonSchemaParams = new JSONObject();
for (Map.Entry<String, String> prop : schema.getParameters().entrySet()) {
jsonSchemaParams.put(prop.getKey(), prop.getValue());
}
jsonSchema.put(JsonSchema.SCHEMA_PARAMETERS_FIELD_NAME, jsonSchemaParams);
}
// default value
if (schema.getDefaultValue() != null) {
jsonSchema.put(JsonSchema.SCHEMA_DEFAULT_FIELD_NAME, convertToJson(schema, schema.getDefaultValue()));
}
// add cache
fromConnectSchemaCache.put(schema, jsonSchema);
return jsonSchema;
}