public SchemaAndValue recordToSchemaAndValue()

in src/main/java/com/azure/cosmos/kafka/connect/source/JsonToStruct.java [30:52]


    public SchemaAndValue recordToSchemaAndValue(final JsonNode node) {
        Schema nodeSchema = inferSchema(node);
        Struct struct = new Struct(nodeSchema);

        if (nodeSchema != null) {
            nodeSchema.fields().forEach(field -> {
                JsonNode fieldValue = node.get(field.name());
                if (fieldValue != null) {
                    SchemaAndValue schemaAndValue = toSchemaAndValue(field.schema(), fieldValue);
                    struct.put(field, schemaAndValue.value());
                } else {
                    boolean optionalField = field.schema().isOptional();
                    Object defaultValue = field.schema().defaultValue();
                    if (optionalField || defaultValue != null) {
                        struct.put(field, defaultValue);
                    } else {
                        logger.error("Missing value for field {}", field.name());
                    }
                }
            });
        }
        return new SchemaAndValue(nodeSchema, struct);
    }