int populateRecord()

in server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/AvroResolver.java [216:338]


    int populateRecord(List<OneField> record, Object fieldValue,
                       Schema fieldSchema, DataType gpdbColType) {

        Schema.Type fieldType = fieldSchema.getType();

        int ret = 0;
        LogicalType logicalType = fieldSchema.getLogicalType();

        switch (fieldType) {
            case ARRAY:
                if (fieldValue == null) {
                    return addOneFieldToRecord(record, gpdbColType, null);
                }
                List<OneField> listRecord = new LinkedList<>();
                ret = setArrayField(listRecord, fieldValue, fieldSchema, gpdbColType);
                DataType type;
                String formatType;

                if (gpdbColType.isArrayType()) {
                    type = gpdbColType;
                    formatType = "{%s}";
                } else {
                    type = DataType.TEXT;
                    formatType = "[%s]";
                }

                addOneFieldToRecord(record, type, String.format(formatType,
                        HdfsUtilities.toString(listRecord, collectionDelim)));
                break;
            case MAP:
                if (fieldValue == null) {
                    return addOneFieldToRecord(record, DataType.TEXT, null);
                }
                List<OneField> mapRecord = new LinkedList<>();
                ret = setMapField(mapRecord, fieldValue, fieldSchema);
                addOneFieldToRecord(record, DataType.TEXT, String.format("{%s}",
                        HdfsUtilities.toString(mapRecord, collectionDelim)));
                break;
            case RECORD:
                if (fieldValue == null) {
                    return addOneFieldToRecord(record, DataType.TEXT, null);
                }
                List<OneField> recRecord = new LinkedList<>();
                ret = setRecordField(recRecord, fieldValue, fieldSchema);
                addOneFieldToRecord(record, DataType.TEXT, String.format("{%s}",
                        HdfsUtilities.toString(recRecord, collectionDelim)));
                break;
            case UNION:
                /*
                 * When an Avro field is actually a union, we resolve the type
                 * of the union element, and delegate the record update via
                 * recursion
                 */

                int unionIndex = GenericData.get().resolveUnion(fieldSchema, fieldValue);
                /*
                 * Retrieve index of the non null data type from the type array
                 * if value is null
                 */
                if (fieldValue == null) {
                    unionIndex ^= 1; // exclusive or assignment
                }
                ret = populateRecord(record, fieldValue, fieldSchema.getTypes().get(unionIndex), gpdbColType);
                break;
            case ENUM:
                ret = addOneFieldToRecord(record, DataType.TEXT, fieldValue);
                break;
            case INT:
                if (logicalType == LogicalTypes.date()) {
                    fieldValue = avroTypeConverter.dateFromInt((int) fieldValue, fieldSchema, logicalType);
                } else if (logicalType == LogicalTypes.timeMillis()) {
                    fieldValue = avroTypeConverter.timeMillis((int) fieldValue, fieldSchema, logicalType);
                }

                DataType gpdbWritableDataType = (logicalType != null) ? gpdbColType : DataType.INTEGER;
                ret = addOneFieldToRecord(record, gpdbWritableDataType, fieldValue);
                break;
            case DOUBLE:
                ret = addOneFieldToRecord(record, DataType.FLOAT8, fieldValue);
                break;
            case STRING:
                String str = (fieldValue != null) ? fieldValue.toString() : null;
                fieldValue = (gpdbColType.isArrayType()) ? pgUtilities.escapeArrayElement(str) : str;
                if(logicalType == LogicalTypes.uuid()){
                    ret = addOneFieldToRecord(record, DataType.UUID, fieldValue);
                } else {
                    ret = addOneFieldToRecord(record, DataType.TEXT, fieldValue);
                }
                break;
            case FLOAT:
                ret = addOneFieldToRecord(record, DataType.REAL, fieldValue);
                break;
            case LONG:
                gpdbWritableDataType = (logicalType != null) ? gpdbColType : DataType.BIGINT;
                if (logicalType == LogicalTypes.timeMicros()) {
                    fieldValue = avroTypeConverter.timeMicros((long) fieldValue, fieldSchema, logicalType);
                } else if (logicalType == LogicalTypes.timestampMillis()) {
                    fieldValue = avroTypeConverter.timestampMillis((long) fieldValue, fieldSchema, logicalType);
                } else if (logicalType == LogicalTypes.timestampMicros()) {
                    fieldValue = avroTypeConverter.timestampMicros((long) fieldValue,fieldSchema, logicalType);
                } else if (logicalType == LogicalTypes.localTimestampMillis()) {
                    fieldValue = avroTypeConverter.localTimestampMillis((long) fieldValue, fieldSchema, logicalType);
                } else if (logicalType == LogicalTypes.localTimestampMicros()) {
                    fieldValue = avroTypeConverter.localTimestampMicros((long) fieldValue, fieldSchema, logicalType);
                }
                ret = addOneFieldToRecord(record, gpdbWritableDataType, fieldValue);
                break;
            case BYTES:
            case FIXED:
                if (logicalType != null && logicalType.getName().equalsIgnoreCase("decimal")) {
                    fieldValue = avroTypeConverter.convertToDecimal(fieldValue, fieldSchema, logicalType);
                }
                DataType gpdbWritableType = (gpdbColType == DataType.TEXT) ? DataType.BYTEA : gpdbColType;
                ret = addOneFieldToRecord(record, gpdbWritableType, fieldValue);
                break;
            case BOOLEAN:
                ret = addOneFieldToRecord(record, DataType.BOOLEAN, fieldValue);
                break;
            default:
                break;
        }
        return ret;
    }