in src/main/java/com/azure/cosmos/kafka/connect/source/JsonToStruct.java [30:52]
public SchemaAndValue recordToSchemaAndValue(final JsonNode node) {
Schema nodeSchema = inferSchema(node);
Struct struct = new Struct(nodeSchema);
if (nodeSchema != null) {
nodeSchema.fields().forEach(field -> {
JsonNode fieldValue = node.get(field.name());
if (fieldValue != null) {
SchemaAndValue schemaAndValue = toSchemaAndValue(field.schema(), fieldValue);
struct.put(field, schemaAndValue.value());
} else {
boolean optionalField = field.schema().isOptional();
Object defaultValue = field.schema().defaultValue();
if (optionalField || defaultValue != null) {
struct.put(field, defaultValue);
} else {
logger.error("Missing value for field {}", field.name());
}
}
});
}
return new SchemaAndValue(nodeSchema, struct);
}