in jsonschema-kafkaconnect-converter/src/main/java/com/amazonaws/services/schemaregistry/kafkaconnect/jsonschema/typeconverters/MapTypeConverter.java [163:203]
public Object toConnect(final Schema schema,
final JsonNode value,
final JsonSchemaDataConfig jsonSchemaDataConfig) {
jsonNodeToConnectValueConverter = new JsonNodeToConnectValueConverter(jsonSchemaDataConfig);
Schema keySchema = schema == null ? null : schema.keySchema();
Schema valueSchema = schema == null ? null : schema.valueSchema();
// If the map uses strings for keys, it should be encoded in the natural JSON format. If it uses other
// primitive types or a complex type as a key, it will be encoded as a list of pairs. If we don't have a
// schema, we default to encoding in a Map.
Map<Object, Object> result = new HashMap<>();
if (schema == null || (!keySchema.isOptional() && keySchema.type() == Schema.Type.STRING)) {
if (!value.isObject()) throw new DataException(
"Maps with string fields should be encoded as JSON objects, but found " + value.getNodeType());
Iterator<Map.Entry<String, JsonNode>> fieldIt = value.fields();
while (fieldIt.hasNext()) {
Map.Entry<String, JsonNode> entry = fieldIt.next();
result.put(entry.getKey(),
jsonNodeToConnectValueConverter.toConnectValue(valueSchema, entry.getValue()));
}
} else {
if (!value.isArray()) {
throw new DataException(
"Maps with non-string fields should be encoded as JSON array of tuples, but found "
+ value.getNodeType());
}
for (JsonNode entry : value) {
if (!entry.isObject()) {
throw new DataException("Found invalid map entry instead of object: " + entry.getNodeType());
}
if (entry.size() != 2) {
throw new DataException("Found invalid map entry, expected length 2 but found :" + entry.size());
}
result.put(jsonNodeToConnectValueConverter.toConnectValue(keySchema, entry.get(
JsonSchemaConverterConstants.KEY_FIELD)),
jsonNodeToConnectValueConverter.toConnectValue(valueSchema, entry.get(
JsonSchemaConverterConstants.VALUE_FIELD)));
}
}
return result;
}