in samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java [228:277]
public static Object convertToJavaObject(Object avroObj, Schema schema) {
if (avroObj == null) {
return null;
}
switch (schema.getType()) {
case RECORD:
return convertToRelRecord((IndexedRecord) avroObj);
case ARRAY: {
ArrayList<Object> retVal = new ArrayList<>();
List<Object> avroArray;
if (avroObj instanceof GenericData.Array) {
avroArray = (GenericData.Array) avroObj;
} else if (avroObj instanceof List) {
avroArray = (List) avroObj;
} else {
throw new SamzaException("Unsupported array type " + avroObj.getClass().getSimpleName());
}
retVal.addAll(avroArray.stream()
.map(v -> convertToJavaObject(v, getNonNullUnionSchema(schema).getElementType()))
.collect(Collectors.toList()));
return retVal;
}
case MAP: {
Map<String, Object> retVal = new HashMap<>();
retVal.putAll(((Map<String, ?>) avroObj).entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getKey,
e -> convertToJavaObject(e.getValue(), getNonNullUnionSchema(schema).getValueType()))));
return retVal;
}
case UNION:
for (Schema unionSchema : schema.getTypes()) {
if (isSchemaCompatibleWithAvroObj(avroObj, unionSchema)) {
return convertToJavaObject(avroObj, unionSchema);
}
}
return null;
case ENUM:
return avroObj.toString();
case FIXED:
org.apache.avro.generic.GenericFixed fixed = (org.apache.avro.generic.GenericFixed) avroObj;
return new ByteString(fixed.bytes());
case BYTES:
return new ByteString(((ByteBuffer) avroObj).array());
default:
return avroObj;
}
}