in rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/converter/record/json/JsonConverter.java [628:745]
public Schema asConnectSchema(JSONObject jsonSchema) {
// schema null
if (jsonSchema == null) {
return null;
}
Schema cached = toConnectSchemaCache.get(jsonSchema);
if (cached != null) {
return cached;
}
String schemaType = String.valueOf(jsonSchema.get(JsonSchema.SCHEMA_TYPE_FIELD_NAME));
if (StringUtils.isEmpty(schemaType)) {
throw new ConnectException("Schema must contain 'type' field");
}
final SchemaBuilder builder;
switch (schemaType) {
case JsonSchema.BOOLEAN_TYPE_NAME:
builder = SchemaBuilder.bool();
break;
case JsonSchema.INT8_TYPE_NAME:
builder = SchemaBuilder.int8();
break;
case JsonSchema.INT16_TYPE_NAME:
builder = SchemaBuilder.int16();
break;
case JsonSchema.INT32_TYPE_NAME:
builder = SchemaBuilder.int32();
break;
case JsonSchema.INT64_TYPE_NAME:
builder = SchemaBuilder.int64();
break;
case JsonSchema.FLOAT_TYPE_NAME:
builder = SchemaBuilder.float32();
break;
case JsonSchema.DOUBLE_TYPE_NAME:
builder = SchemaBuilder.float64();
break;
case JsonSchema.BYTES_TYPE_NAME:
builder = SchemaBuilder.bytes();
break;
case JsonSchema.STRING_TYPE_NAME:
builder = SchemaBuilder.string();
break;
case JsonSchema.ARRAY_TYPE_NAME:
JSONObject elemSchema = (JSONObject) jsonSchema.get(JsonSchema.ARRAY_ITEMS_FIELD_NAME);
if (Objects.isNull(elemSchema)) {
throw new ConnectException("Array schema did not specify the element type");
}
builder = SchemaBuilder.array(asConnectSchema(elemSchema));
break;
case JsonSchema.MAP_TYPE_NAME:
JSONObject keySchema = (JSONObject) jsonSchema.get(JsonSchema.MAP_KEY_FIELD_NAME);
if (keySchema == null) {
throw new ConnectException("Map schema did not specify the key type");
}
JSONObject valueSchema = (JSONObject) jsonSchema.get(JsonSchema.MAP_VALUE_FIELD_NAME);
if (valueSchema == null) {
throw new ConnectException("Map schema did not specify the value type");
}
builder = SchemaBuilder.map(asConnectSchema(keySchema), asConnectSchema(valueSchema));
break;
case JsonSchema.STRUCT_TYPE_NAME:
builder = SchemaBuilder.struct();
List<JSONObject> fields = (List<JSONObject>) jsonSchema.get(JsonSchema.STRUCT_FIELDS_FIELD_NAME);
if (Objects.isNull(fields)) {
throw new ConnectException("Struct schema's \"fields\" argument is not an array.");
}
for (JSONObject field : fields) {
String jsonFieldName = field.getString(JsonSchema.STRUCT_FIELD_NAME_FIELD_NAME);
if (jsonFieldName == null) {
throw new ConnectException("Struct schema's field name not specified properly");
}
builder.field(jsonFieldName, asConnectSchema(field));
}
break;
default:
throw new ConnectException("Unknown schema type: " + schemaType);
}
// optional
Boolean isOptional = jsonSchema.getBoolean(JsonSchema.SCHEMA_OPTIONAL_FIELD_NAME);
if (isOptional != null && isOptional) {
builder.optional();
}
// schema name
String schemaName = jsonSchema.getString(JsonSchema.SCHEMA_NAME_FIELD_NAME);
builder.name(schemaName);
// schema version
Object version = jsonSchema.get(JsonSchema.SCHEMA_VERSION_FIELD_NAME);
if (version != null && version instanceof Integer) {
builder.version(Integer.parseInt(version.toString()));
}
// schema doc
String doc = jsonSchema.getString(JsonSchema.SCHEMA_DOC_FIELD_NAME);
if (StringUtils.isNotEmpty(doc)) {
builder.doc(doc);
}
// schema parameter
JSONObject schemaParams = (JSONObject) jsonSchema.get(JsonSchema.SCHEMA_PARAMETERS_FIELD_NAME);
if (schemaParams != null) {
Map<String, Object> paramsIt = schemaParams.getInnerMap();
paramsIt.forEach((k, v) -> {
builder.parameter(k, String.valueOf(v));
});
}
Object schemaDefaultNode = jsonSchema.get(JsonSchema.SCHEMA_DEFAULT_FIELD_NAME);
if (schemaDefaultNode != null) {
builder.defaultValue(convertToConnect(builder.build(), schemaDefaultNode));
}
Schema result = builder.build();
toConnectSchemaCache.put(jsonSchema, result);
return result;
}