public Object toConnect()

in jsonschema-kafkaconnect-converter/src/main/java/com/amazonaws/services/schemaregistry/kafkaconnect/jsonschema/typeconverters/MapTypeConverter.java [163:203]


    public Object toConnect(final Schema schema,
                            final JsonNode value,
                            final JsonSchemaDataConfig jsonSchemaDataConfig) {
        jsonNodeToConnectValueConverter = new JsonNodeToConnectValueConverter(jsonSchemaDataConfig);
        Schema keySchema = schema == null ? null : schema.keySchema();
        Schema valueSchema = schema == null ? null : schema.valueSchema();

        // If the map uses strings for keys, it should be encoded in the natural JSON format. If it uses other
        // primitive types or a complex type as a key, it will be encoded as a list of pairs. If we don't have a
        // schema, we default to encoding in a Map.
        Map<Object, Object> result = new HashMap<>();
        if (schema == null || (!keySchema.isOptional() && keySchema.type() == Schema.Type.STRING)) {
            if (!value.isObject()) throw new DataException(
                    "Maps with string fields should be encoded as JSON objects, but found " + value.getNodeType());
            Iterator<Map.Entry<String, JsonNode>> fieldIt = value.fields();
            while (fieldIt.hasNext()) {
                Map.Entry<String, JsonNode> entry = fieldIt.next();
                result.put(entry.getKey(),
                           jsonNodeToConnectValueConverter.toConnectValue(valueSchema, entry.getValue()));
            }
        } else {
            if (!value.isArray()) {
                throw new DataException(
                        "Maps with non-string fields should be encoded as JSON array of tuples, but found "
                        + value.getNodeType());
            }
            for (JsonNode entry : value) {
                if (!entry.isObject()) {
                    throw new DataException("Found invalid map entry instead of object: " + entry.getNodeType());
                }
                if (entry.size() != 2) {
                    throw new DataException("Found invalid map entry, expected length 2 but found :" + entry.size());
                }
                result.put(jsonNodeToConnectValueConverter.toConnectValue(keySchema, entry.get(
                        JsonSchemaConverterConstants.KEY_FIELD)),
                           jsonNodeToConnectValueConverter.toConnectValue(valueSchema, entry.get(
                                   JsonSchemaConverterConstants.VALUE_FIELD)));
            }
        }
        return result;
    }