in flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetSplitReaderUtil.java [382:571]
public static WritableColumnVector createWritableColumnVector(
int batchSize,
LogicalType fieldType,
Type type,
List<ColumnDescriptor> columnDescriptors,
int depth) {
List<ColumnDescriptor> descriptors =
getAllColumnDescriptorByType(depth, type, columnDescriptors);
PrimitiveType primitiveType = descriptors.get(0).getPrimitiveType();
PrimitiveType.PrimitiveTypeName typeName = primitiveType.getPrimitiveTypeName();
switch (fieldType.getTypeRoot()) {
case BOOLEAN:
checkArgument(
typeName == PrimitiveType.PrimitiveTypeName.BOOLEAN,
"Unexpected type: %s",
typeName);
return new HeapBooleanVector(batchSize);
case TINYINT:
checkArgument(
typeName == PrimitiveType.PrimitiveTypeName.INT32,
"Unexpected type: %s",
typeName);
return new HeapByteVector(batchSize);
case DOUBLE:
checkArgument(
typeName == PrimitiveType.PrimitiveTypeName.DOUBLE,
"Unexpected type: %s",
typeName);
return new HeapDoubleVector(batchSize);
case FLOAT:
checkArgument(
typeName == PrimitiveType.PrimitiveTypeName.FLOAT,
"Unexpected type: %s",
typeName);
return new HeapFloatVector(batchSize);
case INTEGER:
case DATE:
case TIME_WITHOUT_TIME_ZONE:
checkArgument(
typeName == PrimitiveType.PrimitiveTypeName.INT32,
"Unexpected type: %s",
typeName);
return new HeapIntVector(batchSize);
case BIGINT:
checkArgument(
typeName == PrimitiveType.PrimitiveTypeName.INT64,
"Unexpected type: %s",
typeName);
return new HeapLongVector(batchSize);
case SMALLINT:
checkArgument(
typeName == PrimitiveType.PrimitiveTypeName.INT32,
"Unexpected type: %s",
typeName);
return new HeapShortVector(batchSize);
case CHAR:
case VARCHAR:
case BINARY:
case VARBINARY:
checkArgument(
typeName == PrimitiveType.PrimitiveTypeName.BINARY,
"Unexpected type: %s",
typeName);
return new HeapBytesVector(batchSize);
case TIMESTAMP_WITHOUT_TIME_ZONE:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
checkArgument(
typeName == PrimitiveType.PrimitiveTypeName.INT96
|| typeName == PrimitiveType.PrimitiveTypeName.INT64,
"Unexpected type: %s",
typeName);
return new HeapTimestampVector(batchSize);
case DECIMAL:
DecimalType decimalType = (DecimalType) fieldType;
if (ParquetSchemaConverter.is32BitDecimal(decimalType.getPrecision())) {
checkArgument(
(typeName == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY
|| typeName == PrimitiveType.PrimitiveTypeName.INT32)
&& primitiveType.getOriginalType() == OriginalType.DECIMAL,
"Unexpected type: %s",
typeName);
return new HeapIntVector(batchSize);
} else if (ParquetSchemaConverter.is64BitDecimal(decimalType.getPrecision())) {
checkArgument(
(typeName == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY
|| typeName == PrimitiveType.PrimitiveTypeName.INT64)
&& primitiveType.getOriginalType() == OriginalType.DECIMAL,
"Unexpected type: %s",
typeName);
return new HeapLongVector(batchSize);
} else {
checkArgument(
(typeName == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY
|| typeName == PrimitiveType.PrimitiveTypeName.BINARY)
&& primitiveType.getOriginalType() == OriginalType.DECIMAL,
"Unexpected type: %s",
typeName);
return new HeapBytesVector(batchSize);
}
case ARRAY:
ArrayType arrayType = (ArrayType) fieldType;
return new HeapArrayVector(
batchSize,
createWritableColumnVector(
batchSize,
arrayType.getElementType(),
type,
columnDescriptors,
depth));
case MAP:
MapType mapType = (MapType) fieldType;
LogicalTypeAnnotation mapTypeAnnotation = type.getLogicalTypeAnnotation();
GroupType mapRepeatedType = type.asGroupType().getType(0).asGroupType();
if (mapTypeAnnotation.equals(LogicalTypeAnnotation.listType())) {
mapRepeatedType = mapRepeatedType.getType(0).asGroupType();
depth++;
if (mapRepeatedType
.getLogicalTypeAnnotation()
.equals(LogicalTypeAnnotation.mapType())) {
mapRepeatedType = mapRepeatedType.getType(0).asGroupType();
depth++;
}
}
return new HeapMapVector(
batchSize,
createWritableColumnVector(
batchSize,
mapType.getKeyType(),
mapRepeatedType.getType(0),
descriptors,
depth + 2),
createWritableColumnVector(
batchSize,
mapType.getValueType(),
mapRepeatedType.getType(1),
descriptors,
depth + 2));
case MULTISET:
MultisetType multisetType = (MultisetType) fieldType;
LogicalTypeAnnotation multisetTypeAnnotation = type.getLogicalTypeAnnotation();
GroupType multisetRepeatedType = type.asGroupType().getType(0).asGroupType();
if (multisetTypeAnnotation.equals(LogicalTypeAnnotation.listType())) {
multisetRepeatedType = multisetRepeatedType.getType(0).asGroupType();
depth++;
if (multisetRepeatedType
.getLogicalTypeAnnotation()
.equals(LogicalTypeAnnotation.mapType())) {
multisetRepeatedType = multisetRepeatedType.getType(0).asGroupType();
depth++;
}
}
return new HeapMapVector(
batchSize,
createWritableColumnVector(
batchSize,
multisetType.getElementType(),
multisetRepeatedType.getType(0),
descriptors,
depth + 2),
createWritableColumnVector(
batchSize,
new IntType(false),
multisetRepeatedType.getType(1),
descriptors,
depth + 2));
case ROW:
RowType rowType = (RowType) fieldType;
GroupType groupType = type.asGroupType();
if (LogicalTypeAnnotation.listType().equals(groupType.getLogicalTypeAnnotation())) {
// this means there was two outside struct, need to get group twice.
groupType = groupType.getType(0).asGroupType();
groupType = groupType.getType(0).asGroupType();
depth = depth + 2;
}
WritableColumnVector[] columnVectors =
new WritableColumnVector[rowType.getFieldCount()];
for (int i = 0; i < columnVectors.length; i++) {
columnVectors[i] =
createWritableColumnVector(
batchSize,
rowType.getTypeAt(i),
groupType.getType(i),
descriptors,
depth + 1);
}
return new HeapRowVector(batchSize, columnVectors);
default:
throw new UnsupportedOperationException(fieldType + " is not supported now.");
}
}