in src/main/java/com/azure/cosmos/kafka/connect/source/JsonToStruct.java [54:100]
private Schema inferSchema(JsonNode jsonNode) {
switch (jsonNode.getNodeType()) {
case NULL:
return Schema.OPTIONAL_STRING_SCHEMA;
case BOOLEAN:
return Schema.BOOLEAN_SCHEMA;
case NUMBER:
if (jsonNode.isIntegralNumber()) {
return Schema.INT64_SCHEMA;
} else {
return Schema.FLOAT64_SCHEMA;
}
case ARRAY:
List<JsonNode> jsonValues = new ArrayList<>();
SchemaBuilder arrayBuilder;
jsonNode.forEach(jn -> jsonValues.add(jn));
Schema firstItemSchema = jsonValues.isEmpty() ? Schema.OPTIONAL_STRING_SCHEMA
: inferSchema(jsonValues.get(0));
if (jsonValues.isEmpty() || jsonValues.stream()
.anyMatch(jv -> !Objects.equals(inferSchema(jv), firstItemSchema))) {
// If array is emtpy or it contains elements with different schema types
arrayBuilder = SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA);
arrayBuilder.name(generateName(arrayBuilder));
return arrayBuilder.optional().build();
}
arrayBuilder = SchemaBuilder.array(inferSchema(jsonValues.get(0)));
arrayBuilder.name(generateName(arrayBuilder));
return arrayBuilder.optional().build();
case OBJECT:
SchemaBuilder structBuilder = SchemaBuilder.struct();
Iterator<Map.Entry<String, JsonNode>> it = jsonNode.fields();
while (it.hasNext()) {
Map.Entry<String, JsonNode> entry = it.next();
structBuilder.field(entry.getKey(), inferSchema(entry.getValue()));
}
structBuilder.name(generateName(structBuilder));
return structBuilder.build();
case STRING:
return Schema.STRING_SCHEMA;
case BINARY:
case MISSING:
case POJO:
default:
return null;
}
}