in java/avro-converter/src/main/java/com/microsoft/azure/schemaregistry/kafka/connect/avro/AvroConverterUtils.java [660:868]
public Object fromConnectData(Schema schema, org.apache.avro.Schema avroSchema,
Object logicalValue, boolean buildGenericRecord) {
Schema.Type schemaType =
schema != null ? schema.type() : getSchemaFromLogicalValue(logicalValue);
if (schemaType == null) {
if (buildGenericRecord) {
return new GenericRecordBuilder(ANYTHING_SCHEMA).build();
}
return null;
}
if (logicalValue == null && !schema.isOptional()) {
throw new DataException("Found null value for non-optional schema");
}
if (logicalValue == null) {
if (buildGenericRecord) {
return new GenericRecordBuilder(ANYTHING_SCHEMA).build();
}
return null;
}
Object bytesValue = logicalValue;
// Logical Type Check
if (schema != null && schema.name() != null) {
switch (schema.name()) {
case Decimal.LOGICAL_NAME:
if (!(bytesValue instanceof BigDecimal)) {
throw new DataException("Can't convert type for Decimal, expected BigDecimal but got "
+ bytesValue.getClass());
}
bytesValue = Decimal.fromLogical(schema, (BigDecimal) logicalValue);
break;
case Date.LOGICAL_NAME:
if (!(bytesValue instanceof java.util.Date)) {
throw new DataException(
"Can't convert type for Date, expected Date but got " + bytesValue.getClass());
}
bytesValue = Date.fromLogical(schema, (java.util.Date) logicalValue);
break;
case Time.LOGICAL_NAME:
if (!(bytesValue instanceof java.util.Date)) {
throw new DataException(
"Can't convert type for Time, expected Date but got " + bytesValue.getClass());
}
bytesValue = Time.fromLogical(schema, (java.util.Date) logicalValue);
break;
case Timestamp.LOGICAL_NAME:
if (!(bytesValue instanceof java.util.Date)) {
throw new DataException(
"Can't convert type for Timestamp, expected Date but got " + bytesValue.getClass());
}
bytesValue = Timestamp.fromLogical(schema, (java.util.Date) logicalValue);
break;
default:
break;
}
}
try {
switch (schemaType) {
case INT8: {
Byte byteValue = (Byte) bytesValue;
Integer convertedByteValue = byteValue == null ? null : byteValue.intValue();
return convertedByteValue;
}
case INT16: {
Short shortValue = (Short) bytesValue;
Integer convertedShortValue = shortValue == null ? null : shortValue.intValue();
return convertedShortValue;
}
case INT32:
Integer intValue = (Integer) bytesValue;
return intValue;
case INT64:
Long longValue = (Long) bytesValue;
return longValue;
case FLOAT32:
Float floatValue = (Float) bytesValue;
return floatValue;
case FLOAT64:
Double doubleValue = (Double) bytesValue;
return doubleValue;
case BOOLEAN:
Boolean boolValue = (Boolean) bytesValue;
return boolValue;
case STRING:
if (schema != null && schema.parameters() != null
&& schema.parameters().containsKey(AVRO_TYPE_ENUM)) {
String enumSchemaName = schema.parameters().get(AVRO_TYPE_ENUM);
return enumSymbol(avroSchema, bytesValue, enumSchemaName);
} else {
String stringValue = (String) bytesValue;
return stringValue;
}
case BYTES: {
bytesValue = bytesValue instanceof byte[] ? ByteBuffer.wrap((byte[]) bytesValue)
: (ByteBuffer) bytesValue;
if (schema != null && isFixedSchema(schema)) {
int size = Integer.parseInt(schema.parameters().get(CONNECT_AVRO_FIXED_SIZE_PROP));
org.apache.avro.Schema fixedSchema = null;
if (avroSchema.getType() == org.apache.avro.Schema.Type.UNION) {
int index = 0;
for (org.apache.avro.Schema memberSchema : avroSchema.getTypes()) {
if (memberSchema.getType() == org.apache.avro.Schema.Type.FIXED
&& memberSchema.getFixedSize() == size
&& unionMemberFieldName(memberSchema, index)
.equals(unionMemberFieldName(schema, index))) {
fixedSchema = memberSchema;
}
index++;
}
if (fixedSchema == null) {
throw new DataException("Fixed size " + size + " not found in union " + avroSchema);
}
} else {
fixedSchema = avroSchema;
}
bytesValue = new GenericData.Fixed(fixedSchema, ((ByteBuffer) bytesValue).array());
}
return bytesValue;
}
case ARRAY: {
Collection<Object> list = (Collection<Object>) bytesValue;
List<Object> arrayValue = new ArrayList<>(list.size());
Schema arrayElementSchema = schema != null ? schema.valueSchema() : null;
org.apache.avro.Schema underlyingAvroSchema = getAvroSchema(schema, avroSchema);
org.apache.avro.Schema arrayElementAvroSchema =
schema != null ? underlyingAvroSchema.getElementType() : ANYTHING_SCHEMA;
for (Object val : list) {
arrayValue.add(fromConnectData(arrayElementSchema, arrayElementAvroSchema, val, false));
}
return arrayValue;
}
case MAP: {
Map<Object, Object> map = (Map<Object, Object>) bytesValue;
org.apache.avro.Schema underlyingAvroSchema;
if (schema != null && schema.keySchema().type() == Schema.Type.STRING
&& (!schema.keySchema().isOptional())) {
underlyingAvroSchema = getAvroSchema(schema, avroSchema);
Map<String, Object> mapValue = new HashMap<>();
for (Map.Entry<Object, Object> entry : map.entrySet()) {
Object convertedValue = fromConnectData(schema.valueSchema(),
underlyingAvroSchema.getValueType(), entry.getValue(), false);
mapValue.put((String) entry.getKey(), convertedValue);
}
return mapValue;
} else {
List<GenericRecord> mapValue = new ArrayList<>(map.size());
underlyingAvroSchema = getAvroSchemaWithMapEntry(schema, avroSchema);
org.apache.avro.Schema elementSchema =
schema != null ? underlyingAvroSchema.getElementType()
: ANYTHING_SCHEMA_MAP_ELEMENT;
org.apache.avro.Schema avroKeySchema = elementSchema.getField(KEY_FIELD).schema();
org.apache.avro.Schema avroValueSchema = elementSchema.getField(VALUE_FIELD).schema();
for (Map.Entry<Object, Object> entry : map.entrySet()) {
Object keyConverted = fromConnectData(schema != null ? schema.keySchema() : null,
avroKeySchema, entry.getKey(), false);
Object valueConverted = fromConnectData(schema != null ? schema.valueSchema() : null,
avroValueSchema, entry.getValue(), false);
mapValue.add(new GenericRecordBuilder(elementSchema).set(KEY_FIELD, keyConverted)
.set(VALUE_FIELD, valueConverted).build());
}
return mapValue;
}
}
case STRUCT: {
Struct struct = (Struct) bytesValue;
if (!struct.schema().equals(schema)) {
throw new DataException("Mismatching struct schema");
}
if (AVRO_TYPE_UNION.equals(schema.name())) {
for (Field field : schema.fields()) {
Object object = struct.get(field);
if (object != null) {
return fromConnectData(field.schema(), avroSchema, object, false);
}
}
return fromConnectData(schema, avroSchema, null, true);
} else {
org.apache.avro.Schema underlyingAvroSchema = getAvroSchema(schema, avroSchema);
GenericRecordBuilder convertedBuilder = new GenericRecordBuilder(underlyingAvroSchema);
for (Field field : schema.fields()) {
String fieldName = field.name();
org.apache.avro.Schema.Field schemaField = underlyingAvroSchema.getField(fieldName);
org.apache.avro.Schema fieldAvroSchema = schemaField.schema();
Object fieldValue = struct.get(field);
convertedBuilder.set(fieldName,
fromConnectData(field.schema(), fieldAvroSchema, fieldValue, false));
}
return convertedBuilder.build();
}
}
default:
throw new DataException("Unknown schema type: " + schema.type());
}
} catch (ClassCastException e) {
throw new DataException("Invalid type for " + schema.type() + ": " + bytesValue.getClass());
}
}