in hudi-flink-datasource/hudi-flink1.20.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java [352:504]
private static ColumnReader createColumnReader(
boolean utcTimestamp,
LogicalType fieldType,
Type physicalType,
List<ColumnDescriptor> columns,
PageReadStore pages,
int depth) throws IOException {
List<ColumnDescriptor> descriptors = filterDescriptors(depth, physicalType, columns);
ColumnDescriptor descriptor = descriptors.get(0);
PageReader pageReader = pages.getPageReader(descriptor);
switch (fieldType.getTypeRoot()) {
case BOOLEAN:
return new BooleanColumnReader(descriptor, pageReader);
case TINYINT:
return new ByteColumnReader(descriptor, pageReader);
case DOUBLE:
return new DoubleColumnReader(descriptor, pageReader);
case FLOAT:
return new FloatColumnReader(descriptor, pageReader);
case INTEGER:
case DATE:
case TIME_WITHOUT_TIME_ZONE:
return new IntColumnReader(descriptor, pageReader);
case BIGINT:
return new LongColumnReader(descriptor, pageReader);
case SMALLINT:
return new ShortColumnReader(descriptor, pageReader);
case CHAR:
case VARCHAR:
case BINARY:
case VARBINARY:
return new BytesColumnReader(descriptor, pageReader);
case TIMESTAMP_WITHOUT_TIME_ZONE:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
switch (descriptor.getPrimitiveType().getPrimitiveTypeName()) {
case INT64:
int precision = fieldType instanceof TimestampType
? ((TimestampType) fieldType).getPrecision()
: ((LocalZonedTimestampType) fieldType).getPrecision();
return new Int64TimestampColumnReader(utcTimestamp, descriptor, pageReader, precision);
case INT96:
return new TimestampColumnReader(utcTimestamp, descriptor, pageReader);
default:
throw new AssertionError();
}
case DECIMAL:
switch (descriptor.getPrimitiveType().getPrimitiveTypeName()) {
case INT32:
return new IntColumnReader(descriptor, pageReader);
case INT64:
return new LongColumnReader(descriptor, pageReader);
case BINARY:
return new BytesColumnReader(descriptor, pageReader);
case FIXED_LEN_BYTE_ARRAY:
return new FixedLenBytesColumnReader(
descriptor, pageReader);
default:
throw new AssertionError();
}
case ARRAY:
ArrayType arrayType = (ArrayType) fieldType;
if (arrayType.getElementType().isAnyOf(LogicalTypeFamily.CONSTRUCTED)) {
boolean isThreeLevelList = isThreeLevelList(physicalType);
// 3-level List structure, drill down 2 level to get type for `element`
Type elementType = isThreeLevelList
? physicalType.asGroupType().getType(0).asGroupType().getType(0)
: physicalType.asGroupType().getType(0);
int elementDepth = isThreeLevelList ? depth + 2 : depth + 1;
return new ArrayGroupReader(createColumnReader(
utcTimestamp,
arrayType.getElementType(),
elementType,
descriptors,
pages,
elementDepth));
} else {
return new ArrayColumnReader(
descriptor,
pageReader,
utcTimestamp,
descriptor.getPrimitiveType(),
fieldType);
}
case MAP:
MapType mapType = (MapType) fieldType;
ArrayColumnReader keyReader =
new ArrayColumnReader(
descriptor,
pageReader,
utcTimestamp,
descriptor.getPrimitiveType(),
new ArrayType(mapType.getKeyType()));
ColumnReader<WritableColumnVector> valueReader;
if (mapType.getValueType().isAnyOf(LogicalTypeFamily.CONSTRUCTED)) {
valueReader = new ArrayGroupReader(createColumnReader(
utcTimestamp,
mapType.getValueType(),
physicalType.asGroupType().getType(0).asGroupType().getType(1), // Get the value physical type
descriptors.subList(1, descriptors.size()), // remove the key descriptor
pages,
depth + 2)); // increase the depth by 2, because there's a key_value entry in the path
} else {
valueReader = new ArrayColumnReader(
descriptors.get(1),
pages.getPageReader(descriptors.get(1)),
utcTimestamp,
descriptors.get(1).getPrimitiveType(),
new ArrayType(mapType.getValueType()));
}
return new MapColumnReader(keyReader, valueReader);
case ROW:
RowType rowType = (RowType) fieldType;
GroupType groupType = physicalType.asGroupType();
List<ColumnReader> fieldReaders = new ArrayList<>();
for (int i = 0; i < rowType.getFieldCount(); i++) {
// schema evolution: read the parquet file with a new extended field name.
int fieldIndex = getFieldIndexInPhysicalType(rowType.getFields().get(i).getName(), groupType);
if (fieldIndex < 0) {
fieldReaders.add(new EmptyColumnReader());
} else {
// Check for nested row in array with atomic field type.
// This is done to meet the Parquet field algorithm that pushes multiplicity and structures down to individual fields.
// In Parquet, an array of rows is stored as separate arrays for each field.
// Limitations: It won't work for multiple nested arrays and maps.
// The main problem is that the Flink classes and interface don't follow that pattern.
if (descriptors.get(fieldIndex).getMaxRepetitionLevel() > 0 && !rowType.getTypeAt(i).is(LogicalTypeRoot.ARRAY)) {
fieldReaders.add(
createColumnReader(
utcTimestamp,
new ArrayType(rowType.getTypeAt(i).isNullable(), rowType.getTypeAt(i)),
groupType.getType(fieldIndex),
descriptors,
pages,
depth + 1));
} else {
fieldReaders.add(
createColumnReader(
utcTimestamp,
rowType.getTypeAt(i),
groupType.getType(fieldIndex),
descriptors,
pages,
depth + 1));
}
}
}
return new RowColumnReader(fieldReaders);
default:
throw new UnsupportedOperationException(fieldType + " is not supported now.");
}
}