in connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java [65:186]
static {
TO_CONNECT_CONVERTERS.put(Schema.Type.BOOLEAN, new JsonToConnectTypeConverter() {
@Override
public Object convert(Schema schema, JsonNode value) {
return value.booleanValue();
}
});
TO_CONNECT_CONVERTERS.put(Schema.Type.INT8, new JsonToConnectTypeConverter() {
@Override
public Object convert(Schema schema, JsonNode value) {
return (byte) value.intValue();
}
});
TO_CONNECT_CONVERTERS.put(Schema.Type.INT16, new JsonToConnectTypeConverter() {
@Override
public Object convert(Schema schema, JsonNode value) {
return (short) value.intValue();
}
});
TO_CONNECT_CONVERTERS.put(Schema.Type.INT32, new JsonToConnectTypeConverter() {
@Override
public Object convert(Schema schema, JsonNode value) {
return value.intValue();
}
});
TO_CONNECT_CONVERTERS.put(Schema.Type.INT64, new JsonToConnectTypeConverter() {
@Override
public Object convert(Schema schema, JsonNode value) {
return value.longValue();
}
});
TO_CONNECT_CONVERTERS.put(Schema.Type.FLOAT32, new JsonToConnectTypeConverter() {
@Override
public Object convert(Schema schema, JsonNode value) {
return value.floatValue();
}
});
TO_CONNECT_CONVERTERS.put(Schema.Type.FLOAT64, new JsonToConnectTypeConverter() {
@Override
public Object convert(Schema schema, JsonNode value) {
return value.doubleValue();
}
});
TO_CONNECT_CONVERTERS.put(Schema.Type.BYTES, new JsonToConnectTypeConverter() {
@Override
public Object convert(Schema schema, JsonNode value) {
try {
return value.binaryValue();
} catch (IOException e) {
throw new DataException("Invalid bytes field", e);
}
}
});
TO_CONNECT_CONVERTERS.put(Schema.Type.STRING, new JsonToConnectTypeConverter() {
@Override
public Object convert(Schema schema, JsonNode value) {
return value.textValue();
}
});
TO_CONNECT_CONVERTERS.put(Schema.Type.ARRAY, new JsonToConnectTypeConverter() {
@Override
public Object convert(Schema schema, JsonNode value) {
Schema elemSchema = schema == null ? null : schema.valueSchema();
ArrayList<Object> result = new ArrayList<>();
for (JsonNode elem : value) {
result.add(convertToConnect(elemSchema, elem));
}
return result;
}
});
TO_CONNECT_CONVERTERS.put(Schema.Type.MAP, new JsonToConnectTypeConverter() {
@Override
public Object convert(Schema schema, JsonNode value) {
Schema keySchema = schema == null ? null : schema.keySchema();
Schema valueSchema = schema == null ? null : schema.valueSchema();
// If the map uses strings for keys, it should be encoded in the natural JSON format. If it uses other
// primitive types or a complex type as a key, it will be encoded as a list of pairs. If we don't have a
// schema, we default to encoding in a Map.
Map<Object, Object> result = new HashMap<>();
if (schema == null || keySchema.type() == Schema.Type.STRING) {
if (!value.isObject())
throw new DataException("Maps with string fields should be encoded as JSON objects, but found " + value.getNodeType());
Iterator<Map.Entry<String, JsonNode>> fieldIt = value.fields();
while (fieldIt.hasNext()) {
Map.Entry<String, JsonNode> entry = fieldIt.next();
result.put(entry.getKey(), convertToConnect(valueSchema, entry.getValue()));
}
} else {
if (!value.isArray())
throw new DataException("Maps with non-string fields should be encoded as JSON array of tuples, but found " + value.getNodeType());
for (JsonNode entry : value) {
if (!entry.isArray())
throw new DataException("Found invalid map entry instead of array tuple: " + entry.getNodeType());
if (entry.size() != 2)
throw new DataException("Found invalid map entry, expected length 2 but found :" + entry.size());
result.put(convertToConnect(keySchema, entry.get(0)),
convertToConnect(valueSchema, entry.get(1)));
}
}
return result;
}
});
TO_CONNECT_CONVERTERS.put(Schema.Type.STRUCT, new JsonToConnectTypeConverter() {
@Override
public Object convert(Schema schema, JsonNode value) {
if (!value.isObject())
throw new DataException("Structs should be encoded as JSON objects, but found " + value.getNodeType());
// We only have ISchema here but need Schema, so we need to materialize the actual schema. Using ISchema
// avoids having to materialize the schema for non-Struct types but it cannot be avoided for Structs since
// they require a schema to be provided at construction. However, the schema is only a SchemaBuilder during
// translation of schemas to JSON; during the more common translation of data to JSON, the call to schema.schema()
// just returns the schema Object and has no overhead.
Struct result = new Struct(schema.schema());
for (Field field : schema.fields())
result.put(field, convertToConnect(field.schema(), value.get(field.name())));
return result;
}
});
}