static

in schema-converter/json-schema-converter/src/main/java/org/apache/rocketmq/schema/json/JsonSchemaData.java [100:209]


    static {
        TO_CONNECT_CONVERTERS.put(FieldType.BOOLEAN, (schema, value) -> value.booleanValue());
        TO_CONNECT_CONVERTERS.put(FieldType.INT8, (schema, value) -> (byte) value.shortValue());
        TO_CONNECT_CONVERTERS.put(FieldType.INT16, (schema, value) -> value.shortValue());
        TO_CONNECT_CONVERTERS.put(FieldType.INT32, (schema, value) -> value.intValue());
        TO_CONNECT_CONVERTERS.put(FieldType.INT64, (schema, value) -> value.longValue());
        TO_CONNECT_CONVERTERS.put(FieldType.FLOAT32, (schema, value) -> value.floatValue());
        TO_CONNECT_CONVERTERS.put(FieldType.FLOAT64, (schema, value) -> value.doubleValue());
        TO_CONNECT_CONVERTERS.put(FieldType.BYTES, (schema, value) -> {
            try {
                Object o = value.binaryValue();
                if (o == null) {
                    o = value.decimalValue();  // decimal logical type
                }
                return o;
            } catch (IOException e) {
                throw new ConnectException("Invalid bytes field", e);
            }
        });
        TO_CONNECT_CONVERTERS.put(FieldType.STRING, (schema, value) -> value.textValue());
        TO_CONNECT_CONVERTERS.put(FieldType.ARRAY, (schema, value) -> {
            Schema elemSchema = schema == null ? null : schema.getValueSchema();
            ArrayList<Object> result = new ArrayList<>();
            for (JsonNode elem : value) {
                result.add(toConnectData(elemSchema, elem));
            }
            return result;
        });
        TO_CONNECT_CONVERTERS.put(FieldType.MAP, (schema, value) -> {
            Schema keySchema = schema == null ? null : schema.getKeySchema();
            Schema valueSchema = schema == null ? null : schema.getValueSchema();
            Map<Object, Object> result = new HashMap<>();
            if (schema == null || (keySchema.getFieldType() == FieldType.STRING && !keySchema.isOptional())) {
                if (!value.isObject()) {
                    throw new ConnectException(
                            "Maps with string fields should be encoded as JSON objects, but found "
                                    + value.getNodeType());
                }
                Iterator<Map.Entry<String, JsonNode>> fieldIt = value.fields();
                while (fieldIt.hasNext()) {
                    Map.Entry<String, JsonNode> entry = fieldIt.next();
                    result.put(entry.getKey(), toConnectData(valueSchema, entry.getValue()));
                }
            } else {
                if (!value.isArray()) {
                    throw new ConnectException(
                            "Maps with non-string fields should be encoded as JSON array of objects, but "
                                    + "found "
                                    + value.getNodeType());
                }
                for (JsonNode entry : value) {
                    if (!entry.isObject()) {
                        throw new ConnectException("Found invalid map entry instead of object: "
                                + entry.getNodeType());
                    }
                    if (entry.size() != 2) {
                        throw new ConnectException("Found invalid map entry, expected length 2 but found :" + entry
                                .size());
                    }
                    result.put(toConnectData(keySchema, entry.get(KEY_FIELD)),
                            toConnectData(valueSchema, entry.get(VALUE_FIELD))
                    );
                }
            }
            return result;
        });
        TO_CONNECT_CONVERTERS.put(FieldType.STRUCT, (schema, value) -> {
            if (schema.getName() != null && schema.getName().equals(JSON_TYPE_ONE_OF)) {
                int numMatchingProperties = -1;
                Field matchingField = null;
                for (Field field : schema.getFields()) {
                    Schema fieldSchema = field.getSchema();

                    if (isSimpleSchema(fieldSchema, value)) {
                        return new Struct(schema).put(JSON_TYPE_ONE_OF + ".field." + field.getIndex(),
                                toConnectData(fieldSchema, value)
                        );
                    } else {
                        int matching = matchStructSchema(fieldSchema, value);
                        if (matching > numMatchingProperties) {
                            numMatchingProperties = matching;
                            matchingField = field;
                        }
                    }
                }
                if (matchingField != null) {
                    return new Struct(schema).put(
                            JSON_TYPE_ONE_OF + ".field." + matchingField.getIndex(),
                            toConnectData(matchingField.getSchema(), value)
                    );
                }
                throw new ConnectException("Did not find matching oneof field for data: " + value.toString());
            } else {
                if (!value.isObject()) {
                    throw new ConnectException("Structs should be encoded as JSON objects, but found "
                            + value.getNodeType());
                }

                Struct result = new Struct(schema);
                for (Field field : schema.getFields()) {
                    Object fieldValue = toConnectData(field.getSchema(), value.get(field.getName()));
                    if (fieldValue != null) {
                        result.put(field, fieldValue);
                    }
                }

                return result;
            }
        });
    }