in src/main/java/com/azure/cosmos/kafka/connect/source/JsonToStruct.java [107:143]
private SchemaAndValue toSchemaAndValue(final Schema schema, final JsonNode node) {
SchemaAndValue schemaAndValue = new SchemaAndValue(schema, node);
if (schema.isOptional() && node.isNull()) {
return new SchemaAndValue(schema, null);
}
switch (schema.type()) {
case INT8:
case INT16:
case INT32:
case INT64:
case FLOAT32:
case FLOAT64:
schemaAndValue = numberToSchemaAndValue(schema, node);
break;
case BOOLEAN:
schemaAndValue = new SchemaAndValue(schema, node.asBoolean());
break;
case STRING:
schemaAndValue = stringToSchemaAndValue(schema, node);
break;
case BYTES:
schemaAndValue = new SchemaAndValue(schema, node);
break;
case ARRAY:
schemaAndValue = arrayToSchemaAndValue(schema, node);
break;
case MAP:
schemaAndValue = new SchemaAndValue(schema, node);
break;
case STRUCT:
schemaAndValue = recordToSchemaAndValue(node);
break;
default:
logger.error("Unsupported Schema type: {}", schema.type());
}
return schemaAndValue;
}