in hudi-flink-datasource/hudi-flink1.20.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java [514:681]
private static WritableColumnVector createWritableColumnVector(
int batchSize,
LogicalType fieldType,
Type physicalType,
List<ColumnDescriptor> columns,
int depth) {
List<ColumnDescriptor> descriptors = filterDescriptors(depth, physicalType, columns);
PrimitiveType primitiveType = descriptors.get(0).getPrimitiveType();
PrimitiveType.PrimitiveTypeName typeName = primitiveType.getPrimitiveTypeName();
switch (fieldType.getTypeRoot()) {
case BOOLEAN:
checkArgument(
typeName == PrimitiveType.PrimitiveTypeName.BOOLEAN, getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
return new HeapBooleanVector(batchSize);
case TINYINT:
checkArgument(
typeName == PrimitiveType.PrimitiveTypeName.INT32, getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
return new HeapByteVector(batchSize);
case DOUBLE:
checkArgument(
typeName == PrimitiveType.PrimitiveTypeName.DOUBLE, getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
return new HeapDoubleVector(batchSize);
case FLOAT:
checkArgument(
typeName == PrimitiveType.PrimitiveTypeName.FLOAT, getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
return new HeapFloatVector(batchSize);
case INTEGER:
case DATE:
case TIME_WITHOUT_TIME_ZONE:
checkArgument(
typeName == PrimitiveType.PrimitiveTypeName.INT32, getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
return new HeapIntVector(batchSize);
case BIGINT:
checkArgument(
typeName == PrimitiveType.PrimitiveTypeName.INT64, getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
return new HeapLongVector(batchSize);
case SMALLINT:
checkArgument(
typeName == PrimitiveType.PrimitiveTypeName.INT32, getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
return new HeapShortVector(batchSize);
case CHAR:
case VARCHAR:
case BINARY:
case VARBINARY:
checkArgument(
typeName == PrimitiveType.PrimitiveTypeName.BINARY, getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
return new HeapBytesVector(batchSize);
case TIMESTAMP_WITHOUT_TIME_ZONE:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
checkArgument(primitiveType.getOriginalType() != OriginalType.TIME_MICROS,
getOriginalTypeCheckFailureMessage(primitiveType.getOriginalType(), fieldType));
return new HeapTimestampVector(batchSize);
case DECIMAL:
checkArgument(
(typeName == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY
|| typeName == PrimitiveType.PrimitiveTypeName.BINARY)
&& primitiveType.getOriginalType() == OriginalType.DECIMAL,
getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
return new HeapDecimalVector(batchSize);
case ARRAY:
ArrayType arrayType = (ArrayType) fieldType;
if (arrayType.getElementType().isAnyOf(LogicalTypeFamily.CONSTRUCTED)) {
boolean isThreeLevelList = isThreeLevelList(physicalType);
// 3-level List structure, drill down 2 level to get type for `element`
Type elementType = isThreeLevelList
? physicalType.asGroupType().getType(0).asGroupType().getType(0)
: physicalType.asGroupType().getType(0);
int elementDepth = isThreeLevelList ? depth + 2 : depth + 1;
return new HeapArrayGroupColumnVector(
batchSize,
createWritableColumnVector(
batchSize,
arrayType.getElementType(),
elementType,
descriptors,
elementDepth));
} else {
return new HeapArrayVector(
batchSize,
createWritableColumnVector(
batchSize,
arrayType.getElementType(),
physicalType,
descriptors,
depth));
}
case MAP:
MapType mapType = (MapType) fieldType;
GroupType repeatedType = physicalType.asGroupType().getType(0).asGroupType();
// the map column has three level paths.
WritableColumnVector keyColumnVector = createWritableColumnVector(
batchSize,
new ArrayType(mapType.getKeyType().isNullable(), mapType.getKeyType()),
repeatedType.getType(0),
descriptors,
depth + 2);
WritableColumnVector valueColumnVector;
if (mapType.getValueType().isAnyOf(LogicalTypeFamily.CONSTRUCTED)) {
valueColumnVector = new HeapArrayGroupColumnVector(
batchSize,
createWritableColumnVector(
batchSize,
mapType.getValueType(),
repeatedType.getType(1).asGroupType(),
descriptors,
depth + 2));
} else {
valueColumnVector = createWritableColumnVector(
batchSize,
new ArrayType(mapType.getValueType().isNullable(), mapType.getValueType()),
repeatedType.getType(1),
descriptors,
depth + 2);
}
return new HeapMapColumnVector(batchSize, keyColumnVector, valueColumnVector);
case ROW:
RowType rowType = (RowType) fieldType;
GroupType groupType = physicalType.asGroupType();
WritableColumnVector[] columnVectors = new WritableColumnVector[rowType.getFieldCount()];
for (int i = 0; i < columnVectors.length; i++) {
// schema evolution: read the file with a new extended field name.
int fieldIndex = getFieldIndexInPhysicalType(rowType.getFields().get(i).getName(), groupType);
if (fieldIndex < 0) {
// Check for nested row in array with atomic field type.
// This is done to meet the Parquet field algorithm that pushes multiplicity and structures down to individual fields.
// In Parquet, an array of rows is stored as separate arrays for each field.
// Limitations: It won't work for multiple nested arrays and maps.
// The main problem is that the Flink classes and interface don't follow that pattern.
if (groupType.getRepetition().equals(Type.Repetition.REPEATED) && !rowType.getTypeAt(i).is(LogicalTypeRoot.ARRAY)) {
columnVectors[i] = (WritableColumnVector) createVectorFromConstant(
new ArrayType(rowType.getTypeAt(i).isNullable(), rowType.getTypeAt(i)), null, batchSize);
} else {
columnVectors[i] = (WritableColumnVector) createVectorFromConstant(rowType.getTypeAt(i), null, batchSize);
}
} else {
// Check for nested row in array with atomic field type.
// This is done to meet the Parquet field algorithm that pushes multiplicity and structures down to individual fields.
// In Parquet, an array of rows is stored as separate arrays for each field.
// Limitations: It won't work for multiple nested arrays and maps.
// The main problem is that the Flink classes and interface don't follow that pattern.
if (descriptors.get(fieldIndex).getMaxRepetitionLevel() > 0 && !rowType.getTypeAt(i).is(LogicalTypeRoot.ARRAY)) {
columnVectors[i] =
createWritableColumnVector(
batchSize,
new ArrayType(rowType.getTypeAt(i).isNullable(), rowType.getTypeAt(i)),
groupType.getType(fieldIndex),
descriptors,
depth + 1);
} else {
columnVectors[i] =
createWritableColumnVector(
batchSize,
rowType.getTypeAt(i),
groupType.getType(fieldIndex),
descriptors,
depth + 1);
}
}
}
return new HeapRowColumnVector(batchSize, columnVectors);
default:
throw new UnsupportedOperationException(fieldType + " is not supported now.");
}
}