in avro-kafkaconnect-converter/src/main/java/com/amazonaws/services/schemaregistry/kafkaconnect/avrodata/AvroData.java [362:595]
private static Object fromConnectData(
Schema schema, org.apache.avro.Schema avroSchema,
Object logicalValue, boolean requireContainer,
boolean requireSchemalessContainerNull, boolean enhancedSchemaSupport
) {
Schema.Type schemaType = schema != null
? schema.type()
: schemaTypeForSchemalessJavaType(logicalValue);
if (schemaType == null) {
// Schemaless null data since schema is null and we got a null schema type from the value
if (requireSchemalessContainerNull) {
return new GenericRecordBuilder(ANYTHING_SCHEMA).build();
} else {
return null;
}
}
validateSchemaValue(schema, logicalValue);
if (logicalValue == null) {
// But if this is schemaless, we may not be able to return null directly
if (schema == null && requireSchemalessContainerNull) {
return new GenericRecordBuilder(ANYTHING_SCHEMA).build();
} else {
return null;
}
}
// If this is a logical type, convert it from the convenient Java type to the underlying
// serializeable format
Object value = logicalValue;
if (schema != null && schema.name() != null) {
LogicalTypeConverter logicalConverter = TO_AVRO_LOGICAL_CONVERTERS.get(schema.name());
if (logicalConverter != null) {
value = logicalConverter.convert(schema, logicalValue);
}
}
try {
switch (schemaType) {
case INT8: {
Byte byteValue = (Byte) value; // Check for correct type
Integer convertedByteValue = byteValue == null ? null : byteValue.intValue();
return maybeAddContainer(
avroSchema,
maybeWrapSchemaless(schema, convertedByteValue, ANYTHING_SCHEMA_INT_FIELD),
requireContainer);
}
case INT16: {
Short shortValue = (Short) value; // Check for correct type
Integer convertedShortValue = shortValue == null ? null : shortValue.intValue();
return maybeAddContainer(
avroSchema,
maybeWrapSchemaless(schema, convertedShortValue, ANYTHING_SCHEMA_INT_FIELD),
requireContainer);
}
case INT32:
Integer intValue = (Integer) value; // Check for correct type
return maybeAddContainer(
avroSchema,
maybeWrapSchemaless(schema, value, ANYTHING_SCHEMA_INT_FIELD),
requireContainer);
case INT64:
Long longValue = (Long) value; // Check for correct type
return maybeAddContainer(
avroSchema,
maybeWrapSchemaless(schema, value, ANYTHING_SCHEMA_LONG_FIELD),
requireContainer);
case FLOAT32:
Float floatValue = (Float) value; // Check for correct type
return maybeAddContainer(
avroSchema,
maybeWrapSchemaless(schema, value, ANYTHING_SCHEMA_FLOAT_FIELD),
requireContainer);
case FLOAT64:
Double doubleValue = (Double) value; // Check for correct type
return maybeAddContainer(
avroSchema,
maybeWrapSchemaless(schema, value, ANYTHING_SCHEMA_DOUBLE_FIELD),
requireContainer);
case BOOLEAN:
Boolean boolValue = (Boolean) value; // Check for correct type
return maybeAddContainer(
avroSchema,
maybeWrapSchemaless(schema, value, ANYTHING_SCHEMA_BOOLEAN_FIELD),
requireContainer);
case STRING:
if (enhancedSchemaSupport && schema != null && schema.parameters() != null
&& schema.parameters().containsKey(AVRO_TYPE_ENUM)) {
String enumSchemaName = schema.parameters().get(AVRO_TYPE_ENUM);
org.apache.avro.Schema enumSchema;
if (avroSchema.getType() == org.apache.avro.Schema.Type.UNION) {
int enumIndex = avroSchema.getIndexNamed(enumSchemaName);
enumSchema = avroSchema.getTypes().get(enumIndex);
} else {
enumSchema = avroSchema;
}
value = new GenericData.EnumSymbol(enumSchema, (String) value);
} else {
String stringValue = (String) value; // Check for correct type
}
return maybeAddContainer(
avroSchema,
maybeWrapSchemaless(schema, value, ANYTHING_SCHEMA_STRING_FIELD),
requireContainer);
case BYTES: {
ByteBuffer bytesValue = value instanceof byte[] ? ByteBuffer.wrap((byte[]) value) :
(ByteBuffer) value;
return maybeAddContainer(
avroSchema,
maybeWrapSchemaless(schema, bytesValue, ANYTHING_SCHEMA_BYTES_FIELD),
requireContainer);
}
case ARRAY: {
Collection<Object> list = (Collection<Object>) value;
List<Object> converted = new ArrayList<>(list.size());
Schema elementSchema = schema != null ? schema.valueSchema() : null;
org.apache.avro.Schema underlyingAvroSchema = avroSchemaForUnderlyingTypeIfOptional(
schema, avroSchema);
org.apache.avro.Schema elementAvroSchema =
schema != null ? underlyingAvroSchema.getElementType() : ANYTHING_SCHEMA;
for (Object val : list) {
converted.add(
fromConnectData(
elementSchema,
elementAvroSchema,
val,
false,
true,
enhancedSchemaSupport
)
);
}
return maybeAddContainer(
avroSchema,
maybeWrapSchemaless(schema, converted, ANYTHING_SCHEMA_ARRAY_FIELD),
requireContainer);
}
case MAP: {
Map<Object, Object> map = (Map<Object, Object>) value;
org.apache.avro.Schema underlyingAvroSchema;
if (schema != null && schema.keySchema().type() == Schema.Type.STRING
&& !schema.keySchema().isOptional()) {
underlyingAvroSchema = avroSchemaForUnderlyingTypeIfOptional(schema, avroSchema);
Map<String, Object> converted = new HashMap<>();
for (Map.Entry<Object, Object> entry : map.entrySet()) {
// Key is a String, no conversion needed
Object convertedValue = fromConnectData(schema.valueSchema(),
underlyingAvroSchema.getValueType(),
entry.getValue(), false, true, enhancedSchemaSupport
);
converted.put((String) entry.getKey(), convertedValue);
}
return maybeAddContainer(avroSchema, converted, requireContainer);
} else {
List<GenericRecord> converted = new ArrayList<>(map.size());
underlyingAvroSchema = avroSchemaForUnderlyingMapEntryType(schema, avroSchema);
org.apache.avro.Schema elementSchema =
schema != null
? underlyingAvroSchema.getElementType()
: ANYTHING_SCHEMA_MAP_ELEMENT;
org.apache.avro.Schema avroKeySchema = elementSchema.getField(KEY_FIELD).schema();
org.apache.avro.Schema avroValueSchema = elementSchema.getField(VALUE_FIELD).schema();
for (Map.Entry<Object, Object> entry : map.entrySet()) {
Object keyConverted = fromConnectData(schema != null ? schema.keySchema() : null,
avroKeySchema, entry.getKey(), false, true,
enhancedSchemaSupport);
Object valueConverted = fromConnectData(schema != null ? schema.valueSchema() : null,
avroValueSchema, entry.getValue(), false,
true, enhancedSchemaSupport);
converted.add(
new GenericRecordBuilder(elementSchema)
.set(KEY_FIELD, keyConverted)
.set(VALUE_FIELD, valueConverted)
.build()
);
}
return maybeAddContainer(
avroSchema, maybeWrapSchemaless(schema, converted, ANYTHING_SCHEMA_MAP_FIELD),
requireContainer);
}
}
case STRUCT: {
Struct struct = (Struct) value;
if (!struct.schema().equals(schema)) {
throw new DataException("Mismatching struct schema");
}
//This handles the inverting of a union which is held as a struct, where each field is
// one of the union types.
if (AVRO_TYPE_UNION.equals(schema.name())) {
for (Field field : schema.fields()) {
Object object = struct.get(field);
if (object != null) {
return fromConnectData(
field.schema(),
avroSchema,
object,
false,
true,
enhancedSchemaSupport
);
}
}
return fromConnectData(schema, avroSchema, null, false, true, enhancedSchemaSupport);
} else {
org.apache.avro.Schema underlyingAvroSchema = avroSchemaForUnderlyingTypeIfOptional(
schema, avroSchema);
GenericRecordBuilder convertedBuilder = new GenericRecordBuilder(underlyingAvroSchema);
for (Field field : schema.fields()) {
org.apache.avro.Schema.Field theField = underlyingAvroSchema.getField(field.name());
org.apache.avro.Schema fieldAvroSchema = theField.schema();
convertedBuilder.set(
field.name(),
fromConnectData(field.schema(), fieldAvroSchema, struct.get(field), false,
true, enhancedSchemaSupport)
);
}
return convertedBuilder.build();
}
}
default:
throw new DataException("Unknown schema type: " + schema.type());
}
} catch (ClassCastException e) {
throw new DataException("Invalid type for " + schema.type() + ": " + value.getClass());
}
}