private Object readFields()

in hdfsreader/src/main/java/com/alibaba/datax/plugin/reader/hdfsreader/DFSUtil.java [965:1088]


    private Object readFields(Group g, org.apache.parquet.schema.Type type, int index, Map<String, ParquetMeta> parquetMetaMap, boolean isUtcTimestamp) {
        if (this.getOriginalType(type, parquetMetaMap) == org.apache.parquet.schema.OriginalType.MAP) {
            Group groupData = g.getGroup(index, 0);
            List<org.apache.parquet.schema.Type> parquetTypes = groupData.getType().getFields();
            JSONObject data = new JSONObject();
            for (int i = 0; i < parquetTypes.size(); i++) {
                int j = groupData.getFieldRepetitionCount(i);
                // map key value 的对数
                for (int k = 0; k < j; k++) {
                    Group groupDataK = groupData.getGroup(0, k);
                    List<org.apache.parquet.schema.Type> parquetTypesK = groupDataK.getType().getFields();
                    if (2 != parquetTypesK.size()) {
                        // warn: 不是key value成对出现
                        throw new RuntimeException(String.format("bad parquet map type: %s", groupData.getValueToString(index, 0)));
                    }
                    Object subDataKey = this.readFields(groupDataK, parquetTypesK.get(0), 0, parquetMetaMap, isUtcTimestamp);
                    Object subDataValue = this.readFields(groupDataK, parquetTypesK.get(1), 1, parquetMetaMap, isUtcTimestamp);
                    if (StringUtils.equalsIgnoreCase("key", parquetTypesK.get(0).getName())) {
                        ((JSONObject) data).put(subDataKey.toString(), subDataValue);
                    } else {
                        ((JSONObject) data).put(subDataValue.toString(), subDataKey);
                    }
                }
            }
            return data;
        } else if (this.getOriginalType(type, parquetMetaMap) == org.apache.parquet.schema.OriginalType.MAP_KEY_VALUE) {
            Group groupData = g.getGroup(index, 0);
            List<org.apache.parquet.schema.Type> parquetTypes = groupData.getType().getFields();
            JSONObject data = new JSONObject();
            for (int i = 0; i < parquetTypes.size(); i++) {
                int j = groupData.getFieldRepetitionCount(i);
                // map key value 的对数
                for (int k = 0; k < j; k++) {
                    Group groupDataK = groupData.getGroup(0, k);
                    List<org.apache.parquet.schema.Type> parquetTypesK = groupDataK.getType().getFields();
                    if (2 != parquetTypesK.size()) {
                        // warn: 不是key value成对出现
                        throw new RuntimeException(String.format("bad parquet map type: %s", groupData.getValueToString(index, 0)));
                    }
                    Object subDataKey = this.readFields(groupDataK, parquetTypesK.get(0), 0, parquetMetaMap, isUtcTimestamp);
                    Object subDataValue = this.readFields(groupDataK, parquetTypesK.get(1), 1, parquetMetaMap, isUtcTimestamp);
                    if (StringUtils.equalsIgnoreCase("key", parquetTypesK.get(0).getName())) {
                        ((JSONObject) data).put(subDataKey.toString(), subDataValue);
                    } else {
                        ((JSONObject) data).put(subDataValue.toString(), subDataKey);
                    }
                }
            }
            return data;
        } else if (this.getOriginalType(type, parquetMetaMap) == org.apache.parquet.schema.OriginalType.LIST) {
            Group groupData = g.getGroup(index, 0);
            List<org.apache.parquet.schema.Type> parquetTypes = groupData.getType().getFields();
            JSONArray data = new JSONArray();
            for (int i = 0; i < parquetTypes.size(); i++) {
                Object subData = this.readFields(groupData, parquetTypes.get(i), i, parquetMetaMap, isUtcTimestamp);
                data.add(subData);
            }
            return data;
        } else if (this.getOriginalType(type, parquetMetaMap) == org.apache.parquet.schema.OriginalType.DECIMAL) {
            Binary binaryDate = g.getBinary(index, 0);
            if (null == binaryDate) {
                return null;
            } else {
                org.apache.hadoop.hive.serde2.io.HiveDecimalWritable decimalWritable = new org.apache.hadoop.hive.serde2.io.HiveDecimalWritable(binaryDate.getBytes(), this.asPrimitiveType(type, parquetMetaMap).getDecimalMetadata().getScale());
                // g.getType().getFields().get(1).asPrimitiveType().getDecimalMetadata().getScale()
                HiveDecimal hiveDecimal = decimalWritable.getHiveDecimal();
                if (null == hiveDecimal) {
                    return null;
                } else {
                    return hiveDecimal.bigDecimalValue();
                }
                // return decimalWritable.doubleValue();
            }
        } else if (this.getOriginalType(type, parquetMetaMap) == org.apache.parquet.schema.OriginalType.DATE) {
            return java.sql.Date.valueOf(LocalDate.ofEpochDay(g.getInteger(index, 0)));
        } else if (this.getOriginalType(type, parquetMetaMap) == org.apache.parquet.schema.OriginalType.UTF8) {
            return g.getValueToString(index, 0);
        } else {
            if (type.isPrimitive()) {
                PrimitiveType.PrimitiveTypeName primitiveTypeName = this.asPrimitiveType(type, parquetMetaMap).getPrimitiveTypeName();
                if (PrimitiveType.PrimitiveTypeName.BINARY == primitiveTypeName) {
                    return g.getValueToString(index, 0);
                } else if (PrimitiveType.PrimitiveTypeName.BOOLEAN == primitiveTypeName) {
                    return g.getValueToString(index, 0);
                } else if (PrimitiveType.PrimitiveTypeName.DOUBLE == primitiveTypeName) {
                    return g.getValueToString(index, 0);
                } else if (PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY == primitiveTypeName) {
                    return g.getValueToString(index, 0);
                } else if (PrimitiveType.PrimitiveTypeName.FLOAT == primitiveTypeName) {
                    return g.getValueToString(index, 0);
                } else if (PrimitiveType.PrimitiveTypeName.INT32 == primitiveTypeName) {
                    return g.getValueToString(index, 0);
                } else if (PrimitiveType.PrimitiveTypeName.INT64 == primitiveTypeName) {
                    return g.getValueToString(index, 0);
                } else if (PrimitiveType.PrimitiveTypeName.INT96 == primitiveTypeName) {
                    Binary dataInt96 = g.getInt96(index, 0);
                    if (null == dataInt96) {
                        return null;
                    } else {
                        ByteBuffer buf = dataInt96.toByteBuffer();
                        buf.order(ByteOrder.LITTLE_ENDIAN);
                        long timeOfDayNanos = buf.getLong();
                        int julianDay = buf.getInt();
                        if (isUtcTimestamp) {
                            // UTC
                            LocalDate localDate = LocalDate.ofEpochDay(julianDay - JULIAN_EPOCH_OFFSET_DAYS);
                            LocalTime localTime = LocalTime.ofNanoOfDay(timeOfDayNanos);
                            return Timestamp.valueOf(LocalDateTime.of(localDate, localTime));
                        } else {
                            // local time
                            long mills = julianDayToMillis(julianDay) + (timeOfDayNanos / NANOS_PER_MILLISECOND);
                            Timestamp timestamp = new Timestamp(mills);
                            timestamp.setNanos((int) (timeOfDayNanos % TimeUnit.SECONDS.toNanos(1)));
                            return timestamp;
                        }
                    }
                } else {
                    return g.getValueToString(index, 0);
                }
            } else {
                return g.getValueToString(index, 0);
            }
        }
    }