hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java [337:444]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  private static ColumnReader createColumnReader(
      boolean utcTimestamp,
      LogicalType fieldType,
      Type physicalType,
      List<ColumnDescriptor> columns,
      PageReadStore pages,
      int depth) throws IOException {
    List<ColumnDescriptor> descriptors = filterDescriptors(depth, physicalType, columns);
    ColumnDescriptor descriptor = descriptors.get(0);
    PageReader pageReader = pages.getPageReader(descriptor);
    switch (fieldType.getTypeRoot()) {
      case BOOLEAN:
        return new BooleanColumnReader(descriptor, pageReader);
      case TINYINT:
        return new ByteColumnReader(descriptor, pageReader);
      case DOUBLE:
        return new DoubleColumnReader(descriptor, pageReader);
      case FLOAT:
        return new FloatColumnReader(descriptor, pageReader);
      case INTEGER:
      case DATE:
      case TIME_WITHOUT_TIME_ZONE:
        return new IntColumnReader(descriptor, pageReader);
      case BIGINT:
        return new LongColumnReader(descriptor, pageReader);
      case SMALLINT:
        return new ShortColumnReader(descriptor, pageReader);
      case CHAR:
      case VARCHAR:
      case BINARY:
      case VARBINARY:
        return new BytesColumnReader(descriptor, pageReader);
      case TIMESTAMP_WITHOUT_TIME_ZONE:
      case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
        switch (descriptor.getPrimitiveType().getPrimitiveTypeName()) {
          case INT64:
            int precision = fieldType instanceof TimestampType
                ? ((TimestampType) fieldType).getPrecision()
                : ((LocalZonedTimestampType) fieldType).getPrecision();
            return new Int64TimestampColumnReader(utcTimestamp, descriptor, pageReader, precision);
          case INT96:
            return new TimestampColumnReader(utcTimestamp, descriptor, pageReader);
          default:
            throw new AssertionError();
        }
      case DECIMAL:
        switch (descriptor.getPrimitiveType().getPrimitiveTypeName()) {
          case INT32:
            return new IntColumnReader(descriptor, pageReader);
          case INT64:
            return new LongColumnReader(descriptor, pageReader);
          case BINARY:
            return new BytesColumnReader(descriptor, pageReader);
          case FIXED_LEN_BYTE_ARRAY:
            return new FixedLenBytesColumnReader(
                descriptor, pageReader);
          default:
            throw new AssertionError();
        }
      case ARRAY:
        return new ArrayColumnReader(
            descriptor,
            pageReader,
            utcTimestamp,
            descriptor.getPrimitiveType(),
            fieldType);
      case MAP:
        MapType mapType = (MapType) fieldType;
        ArrayColumnReader keyReader =
            new ArrayColumnReader(
                descriptor,
                pageReader,
                utcTimestamp,
                descriptor.getPrimitiveType(),
                new ArrayType(mapType.getKeyType()));
        ArrayColumnReader valueReader =
            new ArrayColumnReader(
                descriptors.get(1),
                pages.getPageReader(descriptors.get(1)),
                utcTimestamp,
                descriptors.get(1).getPrimitiveType(),
                new ArrayType(mapType.getValueType()));
        return new MapColumnReader(keyReader, valueReader, fieldType);
      case ROW:
        RowType rowType = (RowType) fieldType;
        GroupType groupType = physicalType.asGroupType();
        List<ColumnReader> fieldReaders = new ArrayList<>();
        for (int i = 0; i < rowType.getFieldCount(); i++) {
          // schema evolution: read the parquet file with a new extended field name.
          int fieldIndex = getFieldIndexInPhysicalType(rowType.getFields().get(i).getName(), groupType);
          if (fieldIndex < 0) {
            fieldReaders.add(new EmptyColumnReader());
          } else {
            fieldReaders.add(
                createColumnReader(
                    utcTimestamp,
                    rowType.getTypeAt(i),
                    groupType.getType(fieldIndex),
                    descriptors,
                    pages,
                    depth + 1));
          }
        }
        return new RowColumnReader(fieldReaders);
      default:
        throw new UnsupportedOperationException(fieldType + " is not supported now.");
    }
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java [337:444]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  private static ColumnReader createColumnReader(
      boolean utcTimestamp,
      LogicalType fieldType,
      Type physicalType,
      List<ColumnDescriptor> columns,
      PageReadStore pages,
      int depth) throws IOException {
    List<ColumnDescriptor> descriptors = filterDescriptors(depth, physicalType, columns);
    ColumnDescriptor descriptor = descriptors.get(0);
    PageReader pageReader = pages.getPageReader(descriptor);
    switch (fieldType.getTypeRoot()) {
      case BOOLEAN:
        return new BooleanColumnReader(descriptor, pageReader);
      case TINYINT:
        return new ByteColumnReader(descriptor, pageReader);
      case DOUBLE:
        return new DoubleColumnReader(descriptor, pageReader);
      case FLOAT:
        return new FloatColumnReader(descriptor, pageReader);
      case INTEGER:
      case DATE:
      case TIME_WITHOUT_TIME_ZONE:
        return new IntColumnReader(descriptor, pageReader);
      case BIGINT:
        return new LongColumnReader(descriptor, pageReader);
      case SMALLINT:
        return new ShortColumnReader(descriptor, pageReader);
      case CHAR:
      case VARCHAR:
      case BINARY:
      case VARBINARY:
        return new BytesColumnReader(descriptor, pageReader);
      case TIMESTAMP_WITHOUT_TIME_ZONE:
      case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
        switch (descriptor.getPrimitiveType().getPrimitiveTypeName()) {
          case INT64:
            int precision = fieldType instanceof TimestampType
                ? ((TimestampType) fieldType).getPrecision()
                : ((LocalZonedTimestampType) fieldType).getPrecision();
            return new Int64TimestampColumnReader(utcTimestamp, descriptor, pageReader, precision);
          case INT96:
            return new TimestampColumnReader(utcTimestamp, descriptor, pageReader);
          default:
            throw new AssertionError();
        }
      case DECIMAL:
        switch (descriptor.getPrimitiveType().getPrimitiveTypeName()) {
          case INT32:
            return new IntColumnReader(descriptor, pageReader);
          case INT64:
            return new LongColumnReader(descriptor, pageReader);
          case BINARY:
            return new BytesColumnReader(descriptor, pageReader);
          case FIXED_LEN_BYTE_ARRAY:
            return new FixedLenBytesColumnReader(
                descriptor, pageReader);
          default:
            throw new AssertionError();
        }
      case ARRAY:
        return new ArrayColumnReader(
            descriptor,
            pageReader,
            utcTimestamp,
            descriptor.getPrimitiveType(),
            fieldType);
      case MAP:
        MapType mapType = (MapType) fieldType;
        ArrayColumnReader keyReader =
            new ArrayColumnReader(
                descriptor,
                pageReader,
                utcTimestamp,
                descriptor.getPrimitiveType(),
                new ArrayType(mapType.getKeyType()));
        ArrayColumnReader valueReader =
            new ArrayColumnReader(
                descriptors.get(1),
                pages.getPageReader(descriptors.get(1)),
                utcTimestamp,
                descriptors.get(1).getPrimitiveType(),
                new ArrayType(mapType.getValueType()));
        return new MapColumnReader(keyReader, valueReader, fieldType);
      case ROW:
        RowType rowType = (RowType) fieldType;
        GroupType groupType = physicalType.asGroupType();
        List<ColumnReader> fieldReaders = new ArrayList<>();
        for (int i = 0; i < rowType.getFieldCount(); i++) {
          // schema evolution: read the parquet file with a new extended field name.
          int fieldIndex = getFieldIndexInPhysicalType(rowType.getFields().get(i).getName(), groupType);
          if (fieldIndex < 0) {
            fieldReaders.add(new EmptyColumnReader());
          } else {
            fieldReaders.add(
                createColumnReader(
                    utcTimestamp,
                    rowType.getTypeAt(i),
                    groupType.getType(fieldIndex),
                    descriptors,
                    pages,
                    depth + 1));
          }
        }
        return new RowColumnReader(fieldReaders);
      default:
        throw new UnsupportedOperationException(fieldType + " is not supported now.");
    }
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java [337:444]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  private static ColumnReader createColumnReader(
      boolean utcTimestamp,
      LogicalType fieldType,
      Type physicalType,
      List<ColumnDescriptor> columns,
      PageReadStore pages,
      int depth) throws IOException {
    List<ColumnDescriptor> descriptors = filterDescriptors(depth, physicalType, columns);
    ColumnDescriptor descriptor = descriptors.get(0);
    PageReader pageReader = pages.getPageReader(descriptor);
    switch (fieldType.getTypeRoot()) {
      case BOOLEAN:
        return new BooleanColumnReader(descriptor, pageReader);
      case TINYINT:
        return new ByteColumnReader(descriptor, pageReader);
      case DOUBLE:
        return new DoubleColumnReader(descriptor, pageReader);
      case FLOAT:
        return new FloatColumnReader(descriptor, pageReader);
      case INTEGER:
      case DATE:
      case TIME_WITHOUT_TIME_ZONE:
        return new IntColumnReader(descriptor, pageReader);
      case BIGINT:
        return new LongColumnReader(descriptor, pageReader);
      case SMALLINT:
        return new ShortColumnReader(descriptor, pageReader);
      case CHAR:
      case VARCHAR:
      case BINARY:
      case VARBINARY:
        return new BytesColumnReader(descriptor, pageReader);
      case TIMESTAMP_WITHOUT_TIME_ZONE:
      case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
        switch (descriptor.getPrimitiveType().getPrimitiveTypeName()) {
          case INT64:
            int precision = fieldType instanceof TimestampType
                ? ((TimestampType) fieldType).getPrecision()
                : ((LocalZonedTimestampType) fieldType).getPrecision();
            return new Int64TimestampColumnReader(utcTimestamp, descriptor, pageReader, precision);
          case INT96:
            return new TimestampColumnReader(utcTimestamp, descriptor, pageReader);
          default:
            throw new AssertionError();
        }
      case DECIMAL:
        switch (descriptor.getPrimitiveType().getPrimitiveTypeName()) {
          case INT32:
            return new IntColumnReader(descriptor, pageReader);
          case INT64:
            return new LongColumnReader(descriptor, pageReader);
          case BINARY:
            return new BytesColumnReader(descriptor, pageReader);
          case FIXED_LEN_BYTE_ARRAY:
            return new FixedLenBytesColumnReader(
                descriptor, pageReader);
          default:
            throw new AssertionError();
        }
      case ARRAY:
        return new ArrayColumnReader(
            descriptor,
            pageReader,
            utcTimestamp,
            descriptor.getPrimitiveType(),
            fieldType);
      case MAP:
        MapType mapType = (MapType) fieldType;
        ArrayColumnReader keyReader =
            new ArrayColumnReader(
                descriptor,
                pageReader,
                utcTimestamp,
                descriptor.getPrimitiveType(),
                new ArrayType(mapType.getKeyType()));
        ArrayColumnReader valueReader =
            new ArrayColumnReader(
                descriptors.get(1),
                pages.getPageReader(descriptors.get(1)),
                utcTimestamp,
                descriptors.get(1).getPrimitiveType(),
                new ArrayType(mapType.getValueType()));
        return new MapColumnReader(keyReader, valueReader, fieldType);
      case ROW:
        RowType rowType = (RowType) fieldType;
        GroupType groupType = physicalType.asGroupType();
        List<ColumnReader> fieldReaders = new ArrayList<>();
        for (int i = 0; i < rowType.getFieldCount(); i++) {
          // schema evolution: read the parquet file with a new extended field name.
          int fieldIndex = getFieldIndexInPhysicalType(rowType.getFields().get(i).getName(), groupType);
          if (fieldIndex < 0) {
            fieldReaders.add(new EmptyColumnReader());
          } else {
            fieldReaders.add(
                createColumnReader(
                    utcTimestamp,
                    rowType.getTypeAt(i),
                    groupType.getType(fieldIndex),
                    descriptors,
                    pages,
                    depth + 1));
          }
        }
        return new RowColumnReader(fieldReaders);
      default:
        throw new UnsupportedOperationException(fieldType + " is not supported now.");
    }
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java [337:444]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  private static ColumnReader createColumnReader(
      boolean utcTimestamp,
      LogicalType fieldType,
      Type physicalType,
      List<ColumnDescriptor> columns,
      PageReadStore pages,
      int depth) throws IOException {
    List<ColumnDescriptor> descriptors = filterDescriptors(depth, physicalType, columns);
    ColumnDescriptor descriptor = descriptors.get(0);
    PageReader pageReader = pages.getPageReader(descriptor);
    switch (fieldType.getTypeRoot()) {
      case BOOLEAN:
        return new BooleanColumnReader(descriptor, pageReader);
      case TINYINT:
        return new ByteColumnReader(descriptor, pageReader);
      case DOUBLE:
        return new DoubleColumnReader(descriptor, pageReader);
      case FLOAT:
        return new FloatColumnReader(descriptor, pageReader);
      case INTEGER:
      case DATE:
      case TIME_WITHOUT_TIME_ZONE:
        return new IntColumnReader(descriptor, pageReader);
      case BIGINT:
        return new LongColumnReader(descriptor, pageReader);
      case SMALLINT:
        return new ShortColumnReader(descriptor, pageReader);
      case CHAR:
      case VARCHAR:
      case BINARY:
      case VARBINARY:
        return new BytesColumnReader(descriptor, pageReader);
      case TIMESTAMP_WITHOUT_TIME_ZONE:
      case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
        switch (descriptor.getPrimitiveType().getPrimitiveTypeName()) {
          case INT64:
            int precision = fieldType instanceof TimestampType
                ? ((TimestampType) fieldType).getPrecision()
                : ((LocalZonedTimestampType) fieldType).getPrecision();
            return new Int64TimestampColumnReader(utcTimestamp, descriptor, pageReader, precision);
          case INT96:
            return new TimestampColumnReader(utcTimestamp, descriptor, pageReader);
          default:
            throw new AssertionError();
        }
      case DECIMAL:
        switch (descriptor.getPrimitiveType().getPrimitiveTypeName()) {
          case INT32:
            return new IntColumnReader(descriptor, pageReader);
          case INT64:
            return new LongColumnReader(descriptor, pageReader);
          case BINARY:
            return new BytesColumnReader(descriptor, pageReader);
          case FIXED_LEN_BYTE_ARRAY:
            return new FixedLenBytesColumnReader(
                descriptor, pageReader);
          default:
            throw new AssertionError();
        }
      case ARRAY:
        return new ArrayColumnReader(
            descriptor,
            pageReader,
            utcTimestamp,
            descriptor.getPrimitiveType(),
            fieldType);
      case MAP:
        MapType mapType = (MapType) fieldType;
        ArrayColumnReader keyReader =
            new ArrayColumnReader(
                descriptor,
                pageReader,
                utcTimestamp,
                descriptor.getPrimitiveType(),
                new ArrayType(mapType.getKeyType()));
        ArrayColumnReader valueReader =
            new ArrayColumnReader(
                descriptors.get(1),
                pages.getPageReader(descriptors.get(1)),
                utcTimestamp,
                descriptors.get(1).getPrimitiveType(),
                new ArrayType(mapType.getValueType()));
        return new MapColumnReader(keyReader, valueReader, fieldType);
      case ROW:
        RowType rowType = (RowType) fieldType;
        GroupType groupType = physicalType.asGroupType();
        List<ColumnReader> fieldReaders = new ArrayList<>();
        for (int i = 0; i < rowType.getFieldCount(); i++) {
          // schema evolution: read the parquet file with a new extended field name.
          int fieldIndex = getFieldIndexInPhysicalType(rowType.getFields().get(i).getName(), groupType);
          if (fieldIndex < 0) {
            fieldReaders.add(new EmptyColumnReader());
          } else {
            fieldReaders.add(
                createColumnReader(
                    utcTimestamp,
                    rowType.getTypeAt(i),
                    groupType.getType(fieldIndex),
                    descriptors,
                    pages,
                    depth + 1));
          }
        }
        return new RowColumnReader(fieldReaders);
      default:
        throw new UnsupportedOperationException(fieldType + " is not supported now.");
    }
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java [337:444]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  private static ColumnReader createColumnReader(
      boolean utcTimestamp,
      LogicalType fieldType,
      Type physicalType,
      List<ColumnDescriptor> columns,
      PageReadStore pages,
      int depth) throws IOException {
    List<ColumnDescriptor> descriptors = filterDescriptors(depth, physicalType, columns);
    ColumnDescriptor descriptor = descriptors.get(0);
    PageReader pageReader = pages.getPageReader(descriptor);
    switch (fieldType.getTypeRoot()) {
      case BOOLEAN:
        return new BooleanColumnReader(descriptor, pageReader);
      case TINYINT:
        return new ByteColumnReader(descriptor, pageReader);
      case DOUBLE:
        return new DoubleColumnReader(descriptor, pageReader);
      case FLOAT:
        return new FloatColumnReader(descriptor, pageReader);
      case INTEGER:
      case DATE:
      case TIME_WITHOUT_TIME_ZONE:
        return new IntColumnReader(descriptor, pageReader);
      case BIGINT:
        return new LongColumnReader(descriptor, pageReader);
      case SMALLINT:
        return new ShortColumnReader(descriptor, pageReader);
      case CHAR:
      case VARCHAR:
      case BINARY:
      case VARBINARY:
        return new BytesColumnReader(descriptor, pageReader);
      case TIMESTAMP_WITHOUT_TIME_ZONE:
      case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
        switch (descriptor.getPrimitiveType().getPrimitiveTypeName()) {
          case INT64:
            int precision = fieldType instanceof TimestampType
                ? ((TimestampType) fieldType).getPrecision()
                : ((LocalZonedTimestampType) fieldType).getPrecision();
            return new Int64TimestampColumnReader(utcTimestamp, descriptor, pageReader, precision);
          case INT96:
            return new TimestampColumnReader(utcTimestamp, descriptor, pageReader);
          default:
            throw new AssertionError();
        }
      case DECIMAL:
        switch (descriptor.getPrimitiveType().getPrimitiveTypeName()) {
          case INT32:
            return new IntColumnReader(descriptor, pageReader);
          case INT64:
            return new LongColumnReader(descriptor, pageReader);
          case BINARY:
            return new BytesColumnReader(descriptor, pageReader);
          case FIXED_LEN_BYTE_ARRAY:
            return new FixedLenBytesColumnReader(
                descriptor, pageReader);
          default:
            throw new AssertionError();
        }
      case ARRAY:
        return new ArrayColumnReader(
            descriptor,
            pageReader,
            utcTimestamp,
            descriptor.getPrimitiveType(),
            fieldType);
      case MAP:
        MapType mapType = (MapType) fieldType;
        ArrayColumnReader keyReader =
            new ArrayColumnReader(
                descriptor,
                pageReader,
                utcTimestamp,
                descriptor.getPrimitiveType(),
                new ArrayType(mapType.getKeyType()));
        ArrayColumnReader valueReader =
            new ArrayColumnReader(
                descriptors.get(1),
                pages.getPageReader(descriptors.get(1)),
                utcTimestamp,
                descriptors.get(1).getPrimitiveType(),
                new ArrayType(mapType.getValueType()));
        return new MapColumnReader(keyReader, valueReader, fieldType);
      case ROW:
        RowType rowType = (RowType) fieldType;
        GroupType groupType = physicalType.asGroupType();
        List<ColumnReader> fieldReaders = new ArrayList<>();
        for (int i = 0; i < rowType.getFieldCount(); i++) {
          // schema evolution: read the parquet file with a new extended field name.
          int fieldIndex = getFieldIndexInPhysicalType(rowType.getFields().get(i).getName(), groupType);
          if (fieldIndex < 0) {
            fieldReaders.add(new EmptyColumnReader());
          } else {
            fieldReaders.add(
                createColumnReader(
                    utcTimestamp,
                    rowType.getTypeAt(i),
                    groupType.getType(fieldIndex),
                    descriptors,
                    pages,
                    depth + 1));
          }
        }
        return new RowColumnReader(fieldReaders);
      default:
        throw new UnsupportedOperationException(fieldType + " is not supported now.");
    }
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



