hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetColumnarRowSplitReader.java [118:165]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  public ParquetColumnarRowSplitReader(
      boolean utcTimestamp,
      boolean caseSensitive,
      Configuration conf,
      LogicalType[] selectedTypes,
      String[] selectedFieldNames,
      ColumnBatchGenerator generator,
      int batchSize,
      Path path,
      long splitStart,
      long splitLength,
      FilterPredicate filterPredicate,
      UnboundRecordFilter recordFilter) throws IOException {
    this.utcTimestamp = utcTimestamp;
    this.batchSize = batchSize;
    // then we need to apply the predicate push down filter
    ParquetMetadata footer = readFooter(conf, path, range(splitStart, splitStart + splitLength));
    MessageType fileSchema = footer.getFileMetaData().getSchema();
    FilterCompat.Filter filter = get(filterPredicate, recordFilter);
    List<BlockMetaData> blocks = filterRowGroups(filter, footer.getBlocks(), fileSchema);

    this.fileSchema = footer.getFileMetaData().getSchema();

    Type[] types = clipParquetSchema(fileSchema, selectedFieldNames, caseSensitive);
    int[] requestedIndices = IntStream.range(0, types.length).filter(i -> types[i] != null).toArray();
    Type[] readTypes = Arrays.stream(requestedIndices).mapToObj(i -> types[i]).toArray(Type[]::new);

    this.requestedTypes = Arrays.stream(requestedIndices).mapToObj(i -> selectedTypes[i]).toArray(LogicalType[]::new);
    this.requestedSchema = Types.buildMessage().addFields(readTypes).named("flink-parquet");
    this.reader = new ParquetFileReader(
        conf, footer.getFileMetaData(), path, blocks, requestedSchema.getColumns());

    long totalRowCount = 0;
    for (BlockMetaData block : blocks) {
      totalRowCount += block.getRowCount();
    }
    this.totalRowCount = totalRowCount;
    this.nextRow = 0;
    this.rowsInBatch = 0;
    this.rowsReturned = 0;

    checkSchema();

    this.writableVectors = createWritableVectors();
    ColumnVector[] columnVectors = patchedVector(selectedFieldNames.length, createReadableVectors(), requestedIndices);
    this.columnarBatch = generator.generate(columnVectors);
    this.row = new ColumnarRowData(columnarBatch);
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetColumnarRowSplitReader.java [118:165]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  public ParquetColumnarRowSplitReader(
      boolean utcTimestamp,
      boolean caseSensitive,
      Configuration conf,
      LogicalType[] selectedTypes,
      String[] selectedFieldNames,
      ColumnBatchGenerator generator,
      int batchSize,
      Path path,
      long splitStart,
      long splitLength,
      FilterPredicate filterPredicate,
      UnboundRecordFilter recordFilter) throws IOException {
    this.utcTimestamp = utcTimestamp;
    this.batchSize = batchSize;
    // then we need to apply the predicate push down filter
    ParquetMetadata footer = readFooter(conf, path, range(splitStart, splitStart + splitLength));
    MessageType fileSchema = footer.getFileMetaData().getSchema();
    FilterCompat.Filter filter = get(filterPredicate, recordFilter);
    List<BlockMetaData> blocks = filterRowGroups(filter, footer.getBlocks(), fileSchema);

    this.fileSchema = footer.getFileMetaData().getSchema();

    Type[] types = clipParquetSchema(fileSchema, selectedFieldNames, caseSensitive);
    int[] requestedIndices = IntStream.range(0, types.length).filter(i -> types[i] != null).toArray();
    Type[] readTypes = Arrays.stream(requestedIndices).mapToObj(i -> types[i]).toArray(Type[]::new);

    this.requestedTypes = Arrays.stream(requestedIndices).mapToObj(i -> selectedTypes[i]).toArray(LogicalType[]::new);
    this.requestedSchema = Types.buildMessage().addFields(readTypes).named("flink-parquet");
    this.reader = new ParquetFileReader(
        conf, footer.getFileMetaData(), path, blocks, requestedSchema.getColumns());

    long totalRowCount = 0;
    for (BlockMetaData block : blocks) {
      totalRowCount += block.getRowCount();
    }
    this.totalRowCount = totalRowCount;
    this.nextRow = 0;
    this.rowsInBatch = 0;
    this.rowsReturned = 0;

    checkSchema();

    this.writableVectors = createWritableVectors();
    ColumnVector[] columnVectors = patchedVector(selectedFieldNames.length, createReadableVectors(), requestedIndices);
    this.columnarBatch = generator.generate(columnVectors);
    this.row = new ColumnarRowData(columnarBatch);
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetColumnarRowSplitReader.java [118:165]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  public ParquetColumnarRowSplitReader(
      boolean utcTimestamp,
      boolean caseSensitive,
      Configuration conf,
      LogicalType[] selectedTypes,
      String[] selectedFieldNames,
      ColumnBatchGenerator generator,
      int batchSize,
      Path path,
      long splitStart,
      long splitLength,
      FilterPredicate filterPredicate,
      UnboundRecordFilter recordFilter) throws IOException {
    this.utcTimestamp = utcTimestamp;
    this.batchSize = batchSize;
    // then we need to apply the predicate push down filter
    ParquetMetadata footer = readFooter(conf, path, range(splitStart, splitStart + splitLength));
    MessageType fileSchema = footer.getFileMetaData().getSchema();
    FilterCompat.Filter filter = get(filterPredicate, recordFilter);
    List<BlockMetaData> blocks = filterRowGroups(filter, footer.getBlocks(), fileSchema);

    this.fileSchema = footer.getFileMetaData().getSchema();

    Type[] types = clipParquetSchema(fileSchema, selectedFieldNames, caseSensitive);
    int[] requestedIndices = IntStream.range(0, types.length).filter(i -> types[i] != null).toArray();
    Type[] readTypes = Arrays.stream(requestedIndices).mapToObj(i -> types[i]).toArray(Type[]::new);

    this.requestedTypes = Arrays.stream(requestedIndices).mapToObj(i -> selectedTypes[i]).toArray(LogicalType[]::new);
    this.requestedSchema = Types.buildMessage().addFields(readTypes).named("flink-parquet");
    this.reader = new ParquetFileReader(
        conf, footer.getFileMetaData(), path, blocks, requestedSchema.getColumns());

    long totalRowCount = 0;
    for (BlockMetaData block : blocks) {
      totalRowCount += block.getRowCount();
    }
    this.totalRowCount = totalRowCount;
    this.nextRow = 0;
    this.rowsInBatch = 0;
    this.rowsReturned = 0;

    checkSchema();

    this.writableVectors = createWritableVectors();
    ColumnVector[] columnVectors = patchedVector(selectedFieldNames.length, createReadableVectors(), requestedIndices);
    this.columnarBatch = generator.generate(columnVectors);
    this.row = new ColumnarRowData(columnarBatch);
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetColumnarRowSplitReader.java [118:165]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  public ParquetColumnarRowSplitReader(
      boolean utcTimestamp,
      boolean caseSensitive,
      Configuration conf,
      LogicalType[] selectedTypes,
      String[] selectedFieldNames,
      ColumnBatchGenerator generator,
      int batchSize,
      Path path,
      long splitStart,
      long splitLength,
      FilterPredicate filterPredicate,
      UnboundRecordFilter recordFilter) throws IOException {
    this.utcTimestamp = utcTimestamp;
    this.batchSize = batchSize;
    // then we need to apply the predicate push down filter
    ParquetMetadata footer = readFooter(conf, path, range(splitStart, splitStart + splitLength));
    MessageType fileSchema = footer.getFileMetaData().getSchema();
    FilterCompat.Filter filter = get(filterPredicate, recordFilter);
    List<BlockMetaData> blocks = filterRowGroups(filter, footer.getBlocks(), fileSchema);

    this.fileSchema = footer.getFileMetaData().getSchema();

    Type[] types = clipParquetSchema(fileSchema, selectedFieldNames, caseSensitive);
    int[] requestedIndices = IntStream.range(0, types.length).filter(i -> types[i] != null).toArray();
    Type[] readTypes = Arrays.stream(requestedIndices).mapToObj(i -> types[i]).toArray(Type[]::new);

    this.requestedTypes = Arrays.stream(requestedIndices).mapToObj(i -> selectedTypes[i]).toArray(LogicalType[]::new);
    this.requestedSchema = Types.buildMessage().addFields(readTypes).named("flink-parquet");
    this.reader = new ParquetFileReader(
        conf, footer.getFileMetaData(), path, blocks, requestedSchema.getColumns());

    long totalRowCount = 0;
    for (BlockMetaData block : blocks) {
      totalRowCount += block.getRowCount();
    }
    this.totalRowCount = totalRowCount;
    this.nextRow = 0;
    this.rowsInBatch = 0;
    this.rowsReturned = 0;

    checkSchema();

    this.writableVectors = createWritableVectors();
    ColumnVector[] columnVectors = patchedVector(selectedFieldNames.length, createReadableVectors(), requestedIndices);
    this.columnarBatch = generator.generate(columnVectors);
    this.row = new ColumnarRowData(columnarBatch);
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetColumnarRowSplitReader.java [118:165]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  public ParquetColumnarRowSplitReader(
      boolean utcTimestamp,
      boolean caseSensitive,
      Configuration conf,
      LogicalType[] selectedTypes,
      String[] selectedFieldNames,
      ColumnBatchGenerator generator,
      int batchSize,
      Path path,
      long splitStart,
      long splitLength,
      FilterPredicate filterPredicate,
      UnboundRecordFilter recordFilter) throws IOException {
    this.utcTimestamp = utcTimestamp;
    this.batchSize = batchSize;
    // then we need to apply the predicate push down filter
    ParquetMetadata footer = readFooter(conf, path, range(splitStart, splitStart + splitLength));
    MessageType fileSchema = footer.getFileMetaData().getSchema();
    FilterCompat.Filter filter = get(filterPredicate, recordFilter);
    List<BlockMetaData> blocks = filterRowGroups(filter, footer.getBlocks(), fileSchema);

    this.fileSchema = footer.getFileMetaData().getSchema();

    Type[] types = clipParquetSchema(fileSchema, selectedFieldNames, caseSensitive);
    int[] requestedIndices = IntStream.range(0, types.length).filter(i -> types[i] != null).toArray();
    Type[] readTypes = Arrays.stream(requestedIndices).mapToObj(i -> types[i]).toArray(Type[]::new);

    this.requestedTypes = Arrays.stream(requestedIndices).mapToObj(i -> selectedTypes[i]).toArray(LogicalType[]::new);
    this.requestedSchema = Types.buildMessage().addFields(readTypes).named("flink-parquet");
    this.reader = new ParquetFileReader(
        conf, footer.getFileMetaData(), path, blocks, requestedSchema.getColumns());

    long totalRowCount = 0;
    for (BlockMetaData block : blocks) {
      totalRowCount += block.getRowCount();
    }
    this.totalRowCount = totalRowCount;
    this.nextRow = 0;
    this.rowsInBatch = 0;
    this.rowsReturned = 0;

    checkSchema();

    this.writableVectors = createWritableVectors();
    ColumnVector[] columnVectors = patchedVector(selectedFieldNames.length, createReadableVectors(), requestedIndices);
    this.columnarBatch = generator.generate(columnVectors);
    this.row = new ColumnarRowData(columnarBatch);
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



