in jsonschema-kafkaconnect-converter/src/main/java/com/amazonaws/services/schemaregistry/kafkaconnect/jsonschema/typeconverters/StructTypeConverter.java [258:303]
public Object toConnect(final Schema schema,
final JsonNode value,
final JsonSchemaDataConfig jsonSchemaDataConfig) {
jsonNodeToConnectValueConverter = new JsonNodeToConnectValueConverter(jsonSchemaDataConfig);
Object converted = null;
if (schema.name() != null && schema.name()
.equals(JsonSchemaConverterConstants.JSON_SCHEMA_TYPE_ONEOF)) {
// Special case support for union types
for (Field field : schema.fields()) {
Schema fieldSchema = field.schema();
if (isInstanceOfJsonSchemaTypeForSimpleSchema(fieldSchema, value) || structSchemaEquals(fieldSchema,
value)) {
converted = new Struct(schema.schema()).put("field" + (field.index() + 1),
jsonNodeToConnectValueConverter.toConnectValue(
fieldSchema, value));
break;
}
}
if (converted == null) {
throw new DataException("Did not find matching union field for data: " + value.toString());
}
} else {
if (!value.isObject()) {
throw new DataException("Structs should be encoded as JSON objects, but found " + value.getNodeType());
}
// We only have ISchema here but need Schema, so we need to materialize the actual schema. Using ISchema
// avoids having to materialize the schema for non-Struct types but it cannot be avoided for Structs since
// they require a schema to be provided at construction. However, the schema is only a SchemaBuilder during
// translation of schemas to JSON; during the more common translation of data to JSON, the call to schema
// .schema()
// just returns the schema Object and has no overhead.
Struct result = new Struct(schema.schema());
for (Field field : schema.fields()) {
Object fieldValue = value.get(field.name());
if (fieldValue != null) {
result.put(field,
jsonNodeToConnectValueConverter.toConnectValue(field.schema(), value.get(field.name())));
}
}
return result;
}
return converted;
}