public Schema asConnectSchema()

in rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/converter/record/json/JsonConverter.java [628:745]


    public Schema asConnectSchema(JSONObject jsonSchema) {
        // schema null
        if (jsonSchema == null) {
            return null;
        }
        Schema cached = toConnectSchemaCache.get(jsonSchema);
        if (cached != null) {
            return cached;
        }

        String schemaType = String.valueOf(jsonSchema.get(JsonSchema.SCHEMA_TYPE_FIELD_NAME));
        if (StringUtils.isEmpty(schemaType)) {
            throw new ConnectException("Schema must contain 'type' field");
        }
        final SchemaBuilder builder;
        switch (schemaType) {
            case JsonSchema.BOOLEAN_TYPE_NAME:
                builder = SchemaBuilder.bool();
                break;
            case JsonSchema.INT8_TYPE_NAME:
                builder = SchemaBuilder.int8();
                break;
            case JsonSchema.INT16_TYPE_NAME:
                builder = SchemaBuilder.int16();
                break;
            case JsonSchema.INT32_TYPE_NAME:
                builder = SchemaBuilder.int32();
                break;
            case JsonSchema.INT64_TYPE_NAME:
                builder = SchemaBuilder.int64();
                break;
            case JsonSchema.FLOAT_TYPE_NAME:
                builder = SchemaBuilder.float32();
                break;
            case JsonSchema.DOUBLE_TYPE_NAME:
                builder = SchemaBuilder.float64();
                break;
            case JsonSchema.BYTES_TYPE_NAME:
                builder = SchemaBuilder.bytes();
                break;
            case JsonSchema.STRING_TYPE_NAME:
                builder = SchemaBuilder.string();
                break;
            case JsonSchema.ARRAY_TYPE_NAME:
                JSONObject elemSchema = (JSONObject) jsonSchema.get(JsonSchema.ARRAY_ITEMS_FIELD_NAME);
                if (Objects.isNull(elemSchema)) {
                    throw new ConnectException("Array schema did not specify the element type");
                }
                builder = SchemaBuilder.array(asConnectSchema(elemSchema));
                break;
            case JsonSchema.MAP_TYPE_NAME:
                JSONObject keySchema = (JSONObject) jsonSchema.get(JsonSchema.MAP_KEY_FIELD_NAME);
                if (keySchema == null) {
                    throw new ConnectException("Map schema did not specify the key type");
                }
                JSONObject valueSchema = (JSONObject) jsonSchema.get(JsonSchema.MAP_VALUE_FIELD_NAME);
                if (valueSchema == null) {
                    throw new ConnectException("Map schema did not specify the value type");
                }
                builder = SchemaBuilder.map(asConnectSchema(keySchema), asConnectSchema(valueSchema));
                break;
            case JsonSchema.STRUCT_TYPE_NAME:
                builder = SchemaBuilder.struct();
                List<JSONObject> fields = (List<JSONObject>) jsonSchema.get(JsonSchema.STRUCT_FIELDS_FIELD_NAME);
                if (Objects.isNull(fields)) {
                    throw new ConnectException("Struct schema's \"fields\" argument is not an array.");
                }
                for (JSONObject field : fields) {
                    String jsonFieldName = field.getString(JsonSchema.STRUCT_FIELD_NAME_FIELD_NAME);
                    if (jsonFieldName == null) {
                        throw new ConnectException("Struct schema's field name not specified properly");
                    }
                    builder.field(jsonFieldName, asConnectSchema(field));
                }
                break;
            default:
                throw new ConnectException("Unknown schema type: " + schemaType);
        }

        // optional
        Boolean isOptional = jsonSchema.getBoolean(JsonSchema.SCHEMA_OPTIONAL_FIELD_NAME);
        if (isOptional != null && isOptional) {
            builder.optional();
        }

        // schema name
        String schemaName = jsonSchema.getString(JsonSchema.SCHEMA_NAME_FIELD_NAME);
        builder.name(schemaName);

        // schema version
        Object version = jsonSchema.get(JsonSchema.SCHEMA_VERSION_FIELD_NAME);
        if (version != null && version instanceof Integer) {
            builder.version(Integer.parseInt(version.toString()));
        }

        // schema doc
        String doc = jsonSchema.getString(JsonSchema.SCHEMA_DOC_FIELD_NAME);
        if (StringUtils.isNotEmpty(doc)) {
            builder.doc(doc);
        }

        // schema parameter
        JSONObject schemaParams = (JSONObject) jsonSchema.get(JsonSchema.SCHEMA_PARAMETERS_FIELD_NAME);
        if (schemaParams != null) {
            Map<String, Object> paramsIt = schemaParams.getInnerMap();
            paramsIt.forEach((k, v) -> {
                builder.parameter(k, String.valueOf(v));
            });
        }

        Object schemaDefaultNode = jsonSchema.get(JsonSchema.SCHEMA_DEFAULT_FIELD_NAME);
        if (schemaDefaultNode != null) {
            builder.defaultValue(convertToConnect(builder.build(), schemaDefaultNode));
        }
        Schema result = builder.build();
        toConnectSchemaCache.put(jsonSchema, result);
        return result;
    }