hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java [454:564]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  private static WritableColumnVector createWritableColumnVector(
      int batchSize,
      LogicalType fieldType,
      Type physicalType,
      List<ColumnDescriptor> columns,
      int depth) {
    List<ColumnDescriptor> descriptors = filterDescriptors(depth, physicalType, columns);
    PrimitiveType primitiveType = descriptors.get(0).getPrimitiveType();
    PrimitiveType.PrimitiveTypeName typeName = primitiveType.getPrimitiveTypeName();
    switch (fieldType.getTypeRoot()) {
      case BOOLEAN:
        checkArgument(
            typeName == PrimitiveType.PrimitiveTypeName.BOOLEAN, getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
        return new HeapBooleanVector(batchSize);
      case TINYINT:
        checkArgument(
            typeName == PrimitiveType.PrimitiveTypeName.INT32, getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
        return new HeapByteVector(batchSize);
      case DOUBLE:
        checkArgument(
            typeName == PrimitiveType.PrimitiveTypeName.DOUBLE, getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
        return new HeapDoubleVector(batchSize);
      case FLOAT:
        checkArgument(
            typeName == PrimitiveType.PrimitiveTypeName.FLOAT, getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
        return new HeapFloatVector(batchSize);
      case INTEGER:
      case DATE:
      case TIME_WITHOUT_TIME_ZONE:
        checkArgument(
            typeName == PrimitiveType.PrimitiveTypeName.INT32, getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
        return new HeapIntVector(batchSize);
      case BIGINT:
        checkArgument(
            typeName == PrimitiveType.PrimitiveTypeName.INT64, getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
        return new HeapLongVector(batchSize);
      case SMALLINT:
        checkArgument(
            typeName == PrimitiveType.PrimitiveTypeName.INT32, getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
        return new HeapShortVector(batchSize);
      case CHAR:
      case VARCHAR:
      case BINARY:
      case VARBINARY:
        checkArgument(
            typeName == PrimitiveType.PrimitiveTypeName.BINARY, getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
        return new HeapBytesVector(batchSize);
      case TIMESTAMP_WITHOUT_TIME_ZONE:
      case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
        checkArgument(primitiveType.getOriginalType() != OriginalType.TIME_MICROS,
                getOriginalTypeCheckFailureMessage(primitiveType.getOriginalType(), fieldType));
        return new HeapTimestampVector(batchSize);
      case DECIMAL:
        checkArgument(
            (typeName == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY
                || typeName == PrimitiveType.PrimitiveTypeName.BINARY)
                && primitiveType.getOriginalType() == OriginalType.DECIMAL,
                getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
        return new HeapDecimalVector(batchSize);
      case ARRAY:
        ArrayType arrayType = (ArrayType) fieldType;
        return new HeapArrayVector(
            batchSize,
            createWritableColumnVector(
                batchSize,
                arrayType.getElementType(),
                physicalType,
                descriptors,
                depth));
      case MAP:
        MapType mapType = (MapType) fieldType;
        GroupType repeatedType = physicalType.asGroupType().getType(0).asGroupType();
        // the map column has three level paths.
        return new HeapMapColumnVector(
            batchSize,
            createWritableColumnVector(
                batchSize,
                mapType.getKeyType(),
                repeatedType.getType(0),
                descriptors,
                depth + 2),
            createWritableColumnVector(
                batchSize,
                mapType.getValueType(),
                repeatedType.getType(1),
                descriptors,
                depth + 2));
      case ROW:
        RowType rowType = (RowType) fieldType;
        GroupType groupType = physicalType.asGroupType();
        WritableColumnVector[] columnVectors = new WritableColumnVector[rowType.getFieldCount()];
        for (int i = 0; i < columnVectors.length; i++) {
          // schema evolution: read the file with a new extended field name.
          int fieldIndex = getFieldIndexInPhysicalType(rowType.getFields().get(i).getName(), groupType);
          if (fieldIndex < 0) {
            columnVectors[i] = (WritableColumnVector) createVectorFromConstant(rowType.getTypeAt(i), null, batchSize);
          } else {
            columnVectors[i] =
                createWritableColumnVector(
                    batchSize,
                    rowType.getTypeAt(i),
                    groupType.getType(fieldIndex),
                    descriptors,
                    depth + 1);
          }
        }
        return new HeapRowColumnVector(batchSize, columnVectors);
      default:
        throw new UnsupportedOperationException(fieldType + " is not supported now.");
    }
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java [454:564]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  private static WritableColumnVector createWritableColumnVector(
      int batchSize,
      LogicalType fieldType,
      Type physicalType,
      List<ColumnDescriptor> columns,
      int depth) {
    List<ColumnDescriptor> descriptors = filterDescriptors(depth, physicalType, columns);
    PrimitiveType primitiveType = descriptors.get(0).getPrimitiveType();
    PrimitiveType.PrimitiveTypeName typeName = primitiveType.getPrimitiveTypeName();
    switch (fieldType.getTypeRoot()) {
      case BOOLEAN:
        checkArgument(
            typeName == PrimitiveType.PrimitiveTypeName.BOOLEAN, getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
        return new HeapBooleanVector(batchSize);
      case TINYINT:
        checkArgument(
            typeName == PrimitiveType.PrimitiveTypeName.INT32, getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
        return new HeapByteVector(batchSize);
      case DOUBLE:
        checkArgument(
            typeName == PrimitiveType.PrimitiveTypeName.DOUBLE, getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
        return new HeapDoubleVector(batchSize);
      case FLOAT:
        checkArgument(
            typeName == PrimitiveType.PrimitiveTypeName.FLOAT, getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
        return new HeapFloatVector(batchSize);
      case INTEGER:
      case DATE:
      case TIME_WITHOUT_TIME_ZONE:
        checkArgument(
            typeName == PrimitiveType.PrimitiveTypeName.INT32, getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
        return new HeapIntVector(batchSize);
      case BIGINT:
        checkArgument(
            typeName == PrimitiveType.PrimitiveTypeName.INT64, getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
        return new HeapLongVector(batchSize);
      case SMALLINT:
        checkArgument(
            typeName == PrimitiveType.PrimitiveTypeName.INT32, getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
        return new HeapShortVector(batchSize);
      case CHAR:
      case VARCHAR:
      case BINARY:
      case VARBINARY:
        checkArgument(
            typeName == PrimitiveType.PrimitiveTypeName.BINARY, getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
        return new HeapBytesVector(batchSize);
      case TIMESTAMP_WITHOUT_TIME_ZONE:
      case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
        checkArgument(primitiveType.getOriginalType() != OriginalType.TIME_MICROS,
                getOriginalTypeCheckFailureMessage(primitiveType.getOriginalType(), fieldType));
        return new HeapTimestampVector(batchSize);
      case DECIMAL:
        checkArgument(
            (typeName == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY
                || typeName == PrimitiveType.PrimitiveTypeName.BINARY)
                && primitiveType.getOriginalType() == OriginalType.DECIMAL,
                getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
        return new HeapDecimalVector(batchSize);
      case ARRAY:
        ArrayType arrayType = (ArrayType) fieldType;
        return new HeapArrayVector(
            batchSize,
            createWritableColumnVector(
                batchSize,
                arrayType.getElementType(),
                physicalType,
                descriptors,
                depth));
      case MAP:
        MapType mapType = (MapType) fieldType;
        GroupType repeatedType = physicalType.asGroupType().getType(0).asGroupType();
        // the map column has three level paths.
        return new HeapMapColumnVector(
            batchSize,
            createWritableColumnVector(
                batchSize,
                mapType.getKeyType(),
                repeatedType.getType(0),
                descriptors,
                depth + 2),
            createWritableColumnVector(
                batchSize,
                mapType.getValueType(),
                repeatedType.getType(1),
                descriptors,
                depth + 2));
      case ROW:
        RowType rowType = (RowType) fieldType;
        GroupType groupType = physicalType.asGroupType();
        WritableColumnVector[] columnVectors = new WritableColumnVector[rowType.getFieldCount()];
        for (int i = 0; i < columnVectors.length; i++) {
          // schema evolution: read the file with a new extended field name.
          int fieldIndex = getFieldIndexInPhysicalType(rowType.getFields().get(i).getName(), groupType);
          if (fieldIndex < 0) {
            columnVectors[i] = (WritableColumnVector) createVectorFromConstant(rowType.getTypeAt(i), null, batchSize);
          } else {
            columnVectors[i] =
                createWritableColumnVector(
                    batchSize,
                    rowType.getTypeAt(i),
                    groupType.getType(fieldIndex),
                    descriptors,
                    depth + 1);
          }
        }
        return new HeapRowColumnVector(batchSize, columnVectors);
      default:
        throw new UnsupportedOperationException(fieldType + " is not supported now.");
    }
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java [454:564]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  private static WritableColumnVector createWritableColumnVector(
      int batchSize,
      LogicalType fieldType,
      Type physicalType,
      List<ColumnDescriptor> columns,
      int depth) {
    List<ColumnDescriptor> descriptors = filterDescriptors(depth, physicalType, columns);
    PrimitiveType primitiveType = descriptors.get(0).getPrimitiveType();
    PrimitiveType.PrimitiveTypeName typeName = primitiveType.getPrimitiveTypeName();
    switch (fieldType.getTypeRoot()) {
      case BOOLEAN:
        checkArgument(
            typeName == PrimitiveType.PrimitiveTypeName.BOOLEAN, getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
        return new HeapBooleanVector(batchSize);
      case TINYINT:
        checkArgument(
            typeName == PrimitiveType.PrimitiveTypeName.INT32, getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
        return new HeapByteVector(batchSize);
      case DOUBLE:
        checkArgument(
            typeName == PrimitiveType.PrimitiveTypeName.DOUBLE, getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
        return new HeapDoubleVector(batchSize);
      case FLOAT:
        checkArgument(
            typeName == PrimitiveType.PrimitiveTypeName.FLOAT, getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
        return new HeapFloatVector(batchSize);
      case INTEGER:
      case DATE:
      case TIME_WITHOUT_TIME_ZONE:
        checkArgument(
            typeName == PrimitiveType.PrimitiveTypeName.INT32, getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
        return new HeapIntVector(batchSize);
      case BIGINT:
        checkArgument(
            typeName == PrimitiveType.PrimitiveTypeName.INT64, getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
        return new HeapLongVector(batchSize);
      case SMALLINT:
        checkArgument(
            typeName == PrimitiveType.PrimitiveTypeName.INT32, getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
        return new HeapShortVector(batchSize);
      case CHAR:
      case VARCHAR:
      case BINARY:
      case VARBINARY:
        checkArgument(
            typeName == PrimitiveType.PrimitiveTypeName.BINARY, getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
        return new HeapBytesVector(batchSize);
      case TIMESTAMP_WITHOUT_TIME_ZONE:
      case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
        checkArgument(primitiveType.getOriginalType() != OriginalType.TIME_MICROS,
                getOriginalTypeCheckFailureMessage(primitiveType.getOriginalType(), fieldType));
        return new HeapTimestampVector(batchSize);
      case DECIMAL:
        checkArgument(
            (typeName == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY
                || typeName == PrimitiveType.PrimitiveTypeName.BINARY)
                && primitiveType.getOriginalType() == OriginalType.DECIMAL,
                getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
        return new HeapDecimalVector(batchSize);
      case ARRAY:
        ArrayType arrayType = (ArrayType) fieldType;
        return new HeapArrayVector(
            batchSize,
            createWritableColumnVector(
                batchSize,
                arrayType.getElementType(),
                physicalType,
                descriptors,
                depth));
      case MAP:
        MapType mapType = (MapType) fieldType;
        GroupType repeatedType = physicalType.asGroupType().getType(0).asGroupType();
        // the map column has three level paths.
        return new HeapMapColumnVector(
            batchSize,
            createWritableColumnVector(
                batchSize,
                mapType.getKeyType(),
                repeatedType.getType(0),
                descriptors,
                depth + 2),
            createWritableColumnVector(
                batchSize,
                mapType.getValueType(),
                repeatedType.getType(1),
                descriptors,
                depth + 2));
      case ROW:
        RowType rowType = (RowType) fieldType;
        GroupType groupType = physicalType.asGroupType();
        WritableColumnVector[] columnVectors = new WritableColumnVector[rowType.getFieldCount()];
        for (int i = 0; i < columnVectors.length; i++) {
          // schema evolution: read the file with a new extended field name.
          int fieldIndex = getFieldIndexInPhysicalType(rowType.getFields().get(i).getName(), groupType);
          if (fieldIndex < 0) {
            columnVectors[i] = (WritableColumnVector) createVectorFromConstant(rowType.getTypeAt(i), null, batchSize);
          } else {
            columnVectors[i] =
                createWritableColumnVector(
                    batchSize,
                    rowType.getTypeAt(i),
                    groupType.getType(fieldIndex),
                    descriptors,
                    depth + 1);
          }
        }
        return new HeapRowColumnVector(batchSize, columnVectors);
      default:
        throw new UnsupportedOperationException(fieldType + " is not supported now.");
    }
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java [454:564]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  private static WritableColumnVector createWritableColumnVector(
      int batchSize,
      LogicalType fieldType,
      Type physicalType,
      List<ColumnDescriptor> columns,
      int depth) {
    List<ColumnDescriptor> descriptors = filterDescriptors(depth, physicalType, columns);
    PrimitiveType primitiveType = descriptors.get(0).getPrimitiveType();
    PrimitiveType.PrimitiveTypeName typeName = primitiveType.getPrimitiveTypeName();
    switch (fieldType.getTypeRoot()) {
      case BOOLEAN:
        checkArgument(
            typeName == PrimitiveType.PrimitiveTypeName.BOOLEAN, getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
        return new HeapBooleanVector(batchSize);
      case TINYINT:
        checkArgument(
            typeName == PrimitiveType.PrimitiveTypeName.INT32, getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
        return new HeapByteVector(batchSize);
      case DOUBLE:
        checkArgument(
            typeName == PrimitiveType.PrimitiveTypeName.DOUBLE, getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
        return new HeapDoubleVector(batchSize);
      case FLOAT:
        checkArgument(
            typeName == PrimitiveType.PrimitiveTypeName.FLOAT, getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
        return new HeapFloatVector(batchSize);
      case INTEGER:
      case DATE:
      case TIME_WITHOUT_TIME_ZONE:
        checkArgument(
            typeName == PrimitiveType.PrimitiveTypeName.INT32, getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
        return new HeapIntVector(batchSize);
      case BIGINT:
        checkArgument(
            typeName == PrimitiveType.PrimitiveTypeName.INT64, getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
        return new HeapLongVector(batchSize);
      case SMALLINT:
        checkArgument(
            typeName == PrimitiveType.PrimitiveTypeName.INT32, getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
        return new HeapShortVector(batchSize);
      case CHAR:
      case VARCHAR:
      case BINARY:
      case VARBINARY:
        checkArgument(
            typeName == PrimitiveType.PrimitiveTypeName.BINARY, getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
        return new HeapBytesVector(batchSize);
      case TIMESTAMP_WITHOUT_TIME_ZONE:
      case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
        checkArgument(primitiveType.getOriginalType() != OriginalType.TIME_MICROS,
                getOriginalTypeCheckFailureMessage(primitiveType.getOriginalType(), fieldType));
        return new HeapTimestampVector(batchSize);
      case DECIMAL:
        checkArgument(
            (typeName == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY
                || typeName == PrimitiveType.PrimitiveTypeName.BINARY)
                && primitiveType.getOriginalType() == OriginalType.DECIMAL,
                getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
        return new HeapDecimalVector(batchSize);
      case ARRAY:
        ArrayType arrayType = (ArrayType) fieldType;
        return new HeapArrayVector(
            batchSize,
            createWritableColumnVector(
                batchSize,
                arrayType.getElementType(),
                physicalType,
                descriptors,
                depth));
      case MAP:
        MapType mapType = (MapType) fieldType;
        GroupType repeatedType = physicalType.asGroupType().getType(0).asGroupType();
        // the map column has three level paths.
        return new HeapMapColumnVector(
            batchSize,
            createWritableColumnVector(
                batchSize,
                mapType.getKeyType(),
                repeatedType.getType(0),
                descriptors,
                depth + 2),
            createWritableColumnVector(
                batchSize,
                mapType.getValueType(),
                repeatedType.getType(1),
                descriptors,
                depth + 2));
      case ROW:
        RowType rowType = (RowType) fieldType;
        GroupType groupType = physicalType.asGroupType();
        WritableColumnVector[] columnVectors = new WritableColumnVector[rowType.getFieldCount()];
        for (int i = 0; i < columnVectors.length; i++) {
          // schema evolution: read the file with a new extended field name.
          int fieldIndex = getFieldIndexInPhysicalType(rowType.getFields().get(i).getName(), groupType);
          if (fieldIndex < 0) {
            columnVectors[i] = (WritableColumnVector) createVectorFromConstant(rowType.getTypeAt(i), null, batchSize);
          } else {
            columnVectors[i] =
                createWritableColumnVector(
                    batchSize,
                    rowType.getTypeAt(i),
                    groupType.getType(fieldIndex),
                    descriptors,
                    depth + 1);
          }
        }
        return new HeapRowColumnVector(batchSize, columnVectors);
      default:
        throw new UnsupportedOperationException(fieldType + " is not supported now.");
    }
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java [454:564]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  private static WritableColumnVector createWritableColumnVector(
      int batchSize,
      LogicalType fieldType,
      Type physicalType,
      List<ColumnDescriptor> columns,
      int depth) {
    List<ColumnDescriptor> descriptors = filterDescriptors(depth, physicalType, columns);
    PrimitiveType primitiveType = descriptors.get(0).getPrimitiveType();
    PrimitiveType.PrimitiveTypeName typeName = primitiveType.getPrimitiveTypeName();
    switch (fieldType.getTypeRoot()) {
      case BOOLEAN:
        checkArgument(
            typeName == PrimitiveType.PrimitiveTypeName.BOOLEAN, getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
        return new HeapBooleanVector(batchSize);
      case TINYINT:
        checkArgument(
            typeName == PrimitiveType.PrimitiveTypeName.INT32, getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
        return new HeapByteVector(batchSize);
      case DOUBLE:
        checkArgument(
            typeName == PrimitiveType.PrimitiveTypeName.DOUBLE, getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
        return new HeapDoubleVector(batchSize);
      case FLOAT:
        checkArgument(
            typeName == PrimitiveType.PrimitiveTypeName.FLOAT, getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
        return new HeapFloatVector(batchSize);
      case INTEGER:
      case DATE:
      case TIME_WITHOUT_TIME_ZONE:
        checkArgument(
            typeName == PrimitiveType.PrimitiveTypeName.INT32, getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
        return new HeapIntVector(batchSize);
      case BIGINT:
        checkArgument(
            typeName == PrimitiveType.PrimitiveTypeName.INT64, getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
        return new HeapLongVector(batchSize);
      case SMALLINT:
        checkArgument(
            typeName == PrimitiveType.PrimitiveTypeName.INT32, getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
        return new HeapShortVector(batchSize);
      case CHAR:
      case VARCHAR:
      case BINARY:
      case VARBINARY:
        checkArgument(
            typeName == PrimitiveType.PrimitiveTypeName.BINARY, getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
        return new HeapBytesVector(batchSize);
      case TIMESTAMP_WITHOUT_TIME_ZONE:
      case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
        checkArgument(primitiveType.getOriginalType() != OriginalType.TIME_MICROS,
                getOriginalTypeCheckFailureMessage(primitiveType.getOriginalType(), fieldType));
        return new HeapTimestampVector(batchSize);
      case DECIMAL:
        checkArgument(
            (typeName == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY
                || typeName == PrimitiveType.PrimitiveTypeName.BINARY)
                && primitiveType.getOriginalType() == OriginalType.DECIMAL,
                getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
        return new HeapDecimalVector(batchSize);
      case ARRAY:
        ArrayType arrayType = (ArrayType) fieldType;
        return new HeapArrayVector(
            batchSize,
            createWritableColumnVector(
                batchSize,
                arrayType.getElementType(),
                physicalType,
                descriptors,
                depth));
      case MAP:
        MapType mapType = (MapType) fieldType;
        GroupType repeatedType = physicalType.asGroupType().getType(0).asGroupType();
        // the map column has three level paths.
        return new HeapMapColumnVector(
            batchSize,
            createWritableColumnVector(
                batchSize,
                mapType.getKeyType(),
                repeatedType.getType(0),
                descriptors,
                depth + 2),
            createWritableColumnVector(
                batchSize,
                mapType.getValueType(),
                repeatedType.getType(1),
                descriptors,
                depth + 2));
      case ROW:
        RowType rowType = (RowType) fieldType;
        GroupType groupType = physicalType.asGroupType();
        WritableColumnVector[] columnVectors = new WritableColumnVector[rowType.getFieldCount()];
        for (int i = 0; i < columnVectors.length; i++) {
          // schema evolution: read the file with a new extended field name.
          int fieldIndex = getFieldIndexInPhysicalType(rowType.getFields().get(i).getName(), groupType);
          if (fieldIndex < 0) {
            columnVectors[i] = (WritableColumnVector) createVectorFromConstant(rowType.getTypeAt(i), null, batchSize);
          } else {
            columnVectors[i] =
                createWritableColumnVector(
                    batchSize,
                    rowType.getTypeAt(i),
                    groupType.getType(fieldIndex),
                    descriptors,
                    depth + 1);
          }
        }
        return new HeapRowColumnVector(batchSize, columnVectors);
      default:
        throw new UnsupportedOperationException(fieldType + " is not supported now.");
    }
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



