in hdfsreader/src/main/java/com/alibaba/datax/plugin/reader/hdfsreader/DFSUtil.java [965:1088]
private Object readFields(Group g, org.apache.parquet.schema.Type type, int index, Map<String, ParquetMeta> parquetMetaMap, boolean isUtcTimestamp) {
if (this.getOriginalType(type, parquetMetaMap) == org.apache.parquet.schema.OriginalType.MAP) {
Group groupData = g.getGroup(index, 0);
List<org.apache.parquet.schema.Type> parquetTypes = groupData.getType().getFields();
JSONObject data = new JSONObject();
for (int i = 0; i < parquetTypes.size(); i++) {
int j = groupData.getFieldRepetitionCount(i);
// map key value 的对数
for (int k = 0; k < j; k++) {
Group groupDataK = groupData.getGroup(0, k);
List<org.apache.parquet.schema.Type> parquetTypesK = groupDataK.getType().getFields();
if (2 != parquetTypesK.size()) {
// warn: 不是key value成对出现
throw new RuntimeException(String.format("bad parquet map type: %s", groupData.getValueToString(index, 0)));
}
Object subDataKey = this.readFields(groupDataK, parquetTypesK.get(0), 0, parquetMetaMap, isUtcTimestamp);
Object subDataValue = this.readFields(groupDataK, parquetTypesK.get(1), 1, parquetMetaMap, isUtcTimestamp);
if (StringUtils.equalsIgnoreCase("key", parquetTypesK.get(0).getName())) {
((JSONObject) data).put(subDataKey.toString(), subDataValue);
} else {
((JSONObject) data).put(subDataValue.toString(), subDataKey);
}
}
}
return data;
} else if (this.getOriginalType(type, parquetMetaMap) == org.apache.parquet.schema.OriginalType.MAP_KEY_VALUE) {
Group groupData = g.getGroup(index, 0);
List<org.apache.parquet.schema.Type> parquetTypes = groupData.getType().getFields();
JSONObject data = new JSONObject();
for (int i = 0; i < parquetTypes.size(); i++) {
int j = groupData.getFieldRepetitionCount(i);
// map key value 的对数
for (int k = 0; k < j; k++) {
Group groupDataK = groupData.getGroup(0, k);
List<org.apache.parquet.schema.Type> parquetTypesK = groupDataK.getType().getFields();
if (2 != parquetTypesK.size()) {
// warn: 不是key value成对出现
throw new RuntimeException(String.format("bad parquet map type: %s", groupData.getValueToString(index, 0)));
}
Object subDataKey = this.readFields(groupDataK, parquetTypesK.get(0), 0, parquetMetaMap, isUtcTimestamp);
Object subDataValue = this.readFields(groupDataK, parquetTypesK.get(1), 1, parquetMetaMap, isUtcTimestamp);
if (StringUtils.equalsIgnoreCase("key", parquetTypesK.get(0).getName())) {
((JSONObject) data).put(subDataKey.toString(), subDataValue);
} else {
((JSONObject) data).put(subDataValue.toString(), subDataKey);
}
}
}
return data;
} else if (this.getOriginalType(type, parquetMetaMap) == org.apache.parquet.schema.OriginalType.LIST) {
Group groupData = g.getGroup(index, 0);
List<org.apache.parquet.schema.Type> parquetTypes = groupData.getType().getFields();
JSONArray data = new JSONArray();
for (int i = 0; i < parquetTypes.size(); i++) {
Object subData = this.readFields(groupData, parquetTypes.get(i), i, parquetMetaMap, isUtcTimestamp);
data.add(subData);
}
return data;
} else if (this.getOriginalType(type, parquetMetaMap) == org.apache.parquet.schema.OriginalType.DECIMAL) {
Binary binaryDate = g.getBinary(index, 0);
if (null == binaryDate) {
return null;
} else {
org.apache.hadoop.hive.serde2.io.HiveDecimalWritable decimalWritable = new org.apache.hadoop.hive.serde2.io.HiveDecimalWritable(binaryDate.getBytes(), this.asPrimitiveType(type, parquetMetaMap).getDecimalMetadata().getScale());
// g.getType().getFields().get(1).asPrimitiveType().getDecimalMetadata().getScale()
HiveDecimal hiveDecimal = decimalWritable.getHiveDecimal();
if (null == hiveDecimal) {
return null;
} else {
return hiveDecimal.bigDecimalValue();
}
// return decimalWritable.doubleValue();
}
} else if (this.getOriginalType(type, parquetMetaMap) == org.apache.parquet.schema.OriginalType.DATE) {
return java.sql.Date.valueOf(LocalDate.ofEpochDay(g.getInteger(index, 0)));
} else if (this.getOriginalType(type, parquetMetaMap) == org.apache.parquet.schema.OriginalType.UTF8) {
return g.getValueToString(index, 0);
} else {
if (type.isPrimitive()) {
PrimitiveType.PrimitiveTypeName primitiveTypeName = this.asPrimitiveType(type, parquetMetaMap).getPrimitiveTypeName();
if (PrimitiveType.PrimitiveTypeName.BINARY == primitiveTypeName) {
return g.getValueToString(index, 0);
} else if (PrimitiveType.PrimitiveTypeName.BOOLEAN == primitiveTypeName) {
return g.getValueToString(index, 0);
} else if (PrimitiveType.PrimitiveTypeName.DOUBLE == primitiveTypeName) {
return g.getValueToString(index, 0);
} else if (PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY == primitiveTypeName) {
return g.getValueToString(index, 0);
} else if (PrimitiveType.PrimitiveTypeName.FLOAT == primitiveTypeName) {
return g.getValueToString(index, 0);
} else if (PrimitiveType.PrimitiveTypeName.INT32 == primitiveTypeName) {
return g.getValueToString(index, 0);
} else if (PrimitiveType.PrimitiveTypeName.INT64 == primitiveTypeName) {
return g.getValueToString(index, 0);
} else if (PrimitiveType.PrimitiveTypeName.INT96 == primitiveTypeName) {
Binary dataInt96 = g.getInt96(index, 0);
if (null == dataInt96) {
return null;
} else {
ByteBuffer buf = dataInt96.toByteBuffer();
buf.order(ByteOrder.LITTLE_ENDIAN);
long timeOfDayNanos = buf.getLong();
int julianDay = buf.getInt();
if (isUtcTimestamp) {
// UTC
LocalDate localDate = LocalDate.ofEpochDay(julianDay - JULIAN_EPOCH_OFFSET_DAYS);
LocalTime localTime = LocalTime.ofNanoOfDay(timeOfDayNanos);
return Timestamp.valueOf(LocalDateTime.of(localDate, localTime));
} else {
// local time
long mills = julianDayToMillis(julianDay) + (timeOfDayNanos / NANOS_PER_MILLISECOND);
Timestamp timestamp = new Timestamp(mills);
timestamp.setNanos((int) (timeOfDayNanos % TimeUnit.SECONDS.toNanos(1)));
return timestamp;
}
}
} else {
return g.getValueToString(index, 0);
}
} else {
return g.getValueToString(index, 0);
}
}
}