in server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/AvroResolver.java [216:338]
int populateRecord(List<OneField> record, Object fieldValue,
Schema fieldSchema, DataType gpdbColType) {
Schema.Type fieldType = fieldSchema.getType();
int ret = 0;
LogicalType logicalType = fieldSchema.getLogicalType();
switch (fieldType) {
case ARRAY:
if (fieldValue == null) {
return addOneFieldToRecord(record, gpdbColType, null);
}
List<OneField> listRecord = new LinkedList<>();
ret = setArrayField(listRecord, fieldValue, fieldSchema, gpdbColType);
DataType type;
String formatType;
if (gpdbColType.isArrayType()) {
type = gpdbColType;
formatType = "{%s}";
} else {
type = DataType.TEXT;
formatType = "[%s]";
}
addOneFieldToRecord(record, type, String.format(formatType,
HdfsUtilities.toString(listRecord, collectionDelim)));
break;
case MAP:
if (fieldValue == null) {
return addOneFieldToRecord(record, DataType.TEXT, null);
}
List<OneField> mapRecord = new LinkedList<>();
ret = setMapField(mapRecord, fieldValue, fieldSchema);
addOneFieldToRecord(record, DataType.TEXT, String.format("{%s}",
HdfsUtilities.toString(mapRecord, collectionDelim)));
break;
case RECORD:
if (fieldValue == null) {
return addOneFieldToRecord(record, DataType.TEXT, null);
}
List<OneField> recRecord = new LinkedList<>();
ret = setRecordField(recRecord, fieldValue, fieldSchema);
addOneFieldToRecord(record, DataType.TEXT, String.format("{%s}",
HdfsUtilities.toString(recRecord, collectionDelim)));
break;
case UNION:
/*
* When an Avro field is actually a union, we resolve the type
* of the union element, and delegate the record update via
* recursion
*/
int unionIndex = GenericData.get().resolveUnion(fieldSchema, fieldValue);
/*
* Retrieve index of the non null data type from the type array
* if value is null
*/
if (fieldValue == null) {
unionIndex ^= 1; // exclusive or assignment
}
ret = populateRecord(record, fieldValue, fieldSchema.getTypes().get(unionIndex), gpdbColType);
break;
case ENUM:
ret = addOneFieldToRecord(record, DataType.TEXT, fieldValue);
break;
case INT:
if (logicalType == LogicalTypes.date()) {
fieldValue = avroTypeConverter.dateFromInt((int) fieldValue, fieldSchema, logicalType);
} else if (logicalType == LogicalTypes.timeMillis()) {
fieldValue = avroTypeConverter.timeMillis((int) fieldValue, fieldSchema, logicalType);
}
DataType gpdbWritableDataType = (logicalType != null) ? gpdbColType : DataType.INTEGER;
ret = addOneFieldToRecord(record, gpdbWritableDataType, fieldValue);
break;
case DOUBLE:
ret = addOneFieldToRecord(record, DataType.FLOAT8, fieldValue);
break;
case STRING:
String str = (fieldValue != null) ? fieldValue.toString() : null;
fieldValue = (gpdbColType.isArrayType()) ? pgUtilities.escapeArrayElement(str) : str;
if(logicalType == LogicalTypes.uuid()){
ret = addOneFieldToRecord(record, DataType.UUID, fieldValue);
} else {
ret = addOneFieldToRecord(record, DataType.TEXT, fieldValue);
}
break;
case FLOAT:
ret = addOneFieldToRecord(record, DataType.REAL, fieldValue);
break;
case LONG:
gpdbWritableDataType = (logicalType != null) ? gpdbColType : DataType.BIGINT;
if (logicalType == LogicalTypes.timeMicros()) {
fieldValue = avroTypeConverter.timeMicros((long) fieldValue, fieldSchema, logicalType);
} else if (logicalType == LogicalTypes.timestampMillis()) {
fieldValue = avroTypeConverter.timestampMillis((long) fieldValue, fieldSchema, logicalType);
} else if (logicalType == LogicalTypes.timestampMicros()) {
fieldValue = avroTypeConverter.timestampMicros((long) fieldValue,fieldSchema, logicalType);
} else if (logicalType == LogicalTypes.localTimestampMillis()) {
fieldValue = avroTypeConverter.localTimestampMillis((long) fieldValue, fieldSchema, logicalType);
} else if (logicalType == LogicalTypes.localTimestampMicros()) {
fieldValue = avroTypeConverter.localTimestampMicros((long) fieldValue, fieldSchema, logicalType);
}
ret = addOneFieldToRecord(record, gpdbWritableDataType, fieldValue);
break;
case BYTES:
case FIXED:
if (logicalType != null && logicalType.getName().equalsIgnoreCase("decimal")) {
fieldValue = avroTypeConverter.convertToDecimal(fieldValue, fieldSchema, logicalType);
}
DataType gpdbWritableType = (gpdbColType == DataType.TEXT) ? DataType.BYTEA : gpdbColType;
ret = addOneFieldToRecord(record, gpdbWritableType, fieldValue);
break;
case BOOLEAN:
ret = addOneFieldToRecord(record, DataType.BOOLEAN, fieldValue);
break;
default:
break;
}
return ret;
}