public void init()

in common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java [232:408]


  public void init() throws URISyntaxException, IOException {

    useDecimal128 =
        conf.getBoolean(
            CometConf.COMET_USE_DECIMAL_128().key(),
            (Boolean) CometConf.COMET_USE_DECIMAL_128().defaultValue().get());

    long start = file.start();
    long length = file.length();
    String filePath = file.filePath().toString();
    long fileSize = file.fileSize();

    ParquetReadOptions.Builder builder = HadoopReadOptions.builder(conf, new Path(filePath));

    if (start >= 0 && length >= 0) {
      builder = builder.withRange(start, start + length);
    }
    ParquetReadOptions readOptions = builder.build();

    // TODO: enable off-heap buffer when they are ready
    ReadOptions cometReadOptions = ReadOptions.builder(conf).build();

    Path path = new Path(new URI(filePath));
    try (FileReader fileReader =
        new FileReader(
            CometInputFile.fromPath(path, conf), footer, readOptions, cometReadOptions, metrics)) {

      requestedSchema = footer.getFileMetaData().getSchema();
      MessageType fileSchema = requestedSchema;

      if (sparkSchema == null) {
        sparkSchema = new ParquetToSparkSchemaConverter(conf).convert(requestedSchema);
      } else {
        requestedSchema =
            CometParquetReadSupport.clipParquetSchema(
                requestedSchema, sparkSchema, isCaseSensitive, useFieldId, ignoreMissingIds);
        if (requestedSchema.getFieldCount() != sparkSchema.size()) {
          throw new IllegalArgumentException(
              String.format(
                  "Spark schema has %d columns while " + "Parquet schema has %d columns",
                  sparkSchema.size(), requestedSchema.getColumns().size()));
        }
      }

      String timeZoneId = conf.get("spark.sql.session.timeZone");
      // Native code uses "UTC" always as the timeZoneId when converting from spark to arrow schema.
      Schema arrowSchema = Utils$.MODULE$.toArrowSchema(sparkSchema, "UTC");
      byte[] serializedRequestedArrowSchema = serializeArrowSchema(arrowSchema);
      Schema dataArrowSchema = Utils$.MODULE$.toArrowSchema(dataSchema, "UTC");
      byte[] serializedDataArrowSchema = serializeArrowSchema(dataArrowSchema);

      // Create Column readers
      List<Type> fields = requestedSchema.getFields();
      List<Type> fileFields = fileSchema.getFields();
      int numColumns = fields.size();
      if (partitionSchema != null) numColumns += partitionSchema.size();
      columnReaders = new AbstractColumnReader[numColumns];

      // Initialize missing columns and use null vectors for them
      missingColumns = new boolean[numColumns];
      // We do not need the column index of the row index; but this method has the
      // side effect of throwing an exception if a column with the same name is
      // found which we do want (spark unit tests explicitly test for that).
      ShimFileFormat.findRowIndexColumnIndexInSchema(sparkSchema);
      StructField[] nonPartitionFields = sparkSchema.fields();
      boolean hasRowIndexColumn = false;
      // Ranges of rows to read (needed iff row indexes are being read)
      List<BlockMetaData> blocks =
          FileReader.filterRowGroups(readOptions, footer.getBlocks(), fileReader);
      totalRowCount = fileReader.getFilteredRecordCount();
      if (totalRowCount == 0) {
        // all the data is filtered out.
        isInitialized = true;
        return;
      }
      long[] starts = new long[blocks.size()];
      long[] lengths = new long[blocks.size()];
      starts = new long[blocks.size()];
      lengths = new long[blocks.size()];
      int blockIndex = 0;
      for (BlockMetaData block : blocks) {
        long blockStart = block.getStartingPos();
        long blockLength = block.getCompressedSize();
        starts[blockIndex] = blockStart;
        lengths[blockIndex] = blockLength;
        blockIndex++;
      }
      for (int i = 0; i < fields.size(); i++) {
        Type field = fields.get(i);
        Optional<Type> optFileField =
            fileFields.stream().filter(f -> f.getName().equals(field.getName())).findFirst();
        if (nonPartitionFields[i].name().equals(ShimFileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME())) {
          // Values of ROW_INDEX_TEMPORARY_COLUMN_NAME column are always populated with
          // generated row indexes, rather than read from the file.
          // TODO(SPARK-40059): Allow users to include columns named
          //                    FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME in their schemas.
          long[] rowIndices = FileReader.getRowIndices(blocks);
          columnReaders[i] = new RowIndexColumnReader(nonPartitionFields[i], capacity, rowIndices);
          hasRowIndexColumn = true;
          missingColumns[i] = true;
        } else if (optFileField.isPresent()) {
          // The column we are reading may be a complex type in which case we check if each field in
          // the requested type is in the file type (and the same data type)
          if (!isEqual(field, optFileField.get())) {
            throw new UnsupportedOperationException("Schema evolution is not supported");
          }
          missingColumns[i] = false;
        } else {
          if (field.getRepetition() == Type.Repetition.REQUIRED) {
            throw new IOException(
                "Required column '"
                    + field.getName()
                    + "' is missing"
                    + " in data file "
                    + filePath);
          }
          if (field.isPrimitive()) {
            ConstantColumnReader reader =
                new ConstantColumnReader(nonPartitionFields[i], capacity, useDecimal128);
            columnReaders[i] = reader;
            missingColumns[i] = true;
          } else {
            // the column requested is not in the file, but the native reader can handle that
            // and will return nulls for all rows requested
            missingColumns[i] = false;
          }
        }
      }

      // Initialize constant readers for partition columns
      if (partitionSchema != null) {
        StructField[] partitionFields = partitionSchema.fields();
        for (int i = fields.size(); i < columnReaders.length; i++) {
          int fieldIndex = i - fields.size();
          StructField field = partitionFields[fieldIndex];
          ConstantColumnReader reader =
              new ConstantColumnReader(field, capacity, partitionValues, fieldIndex, useDecimal128);
          columnReaders[i] = reader;
        }
      }

      vectors = new CometVector[numColumns];
      currentBatch = new ColumnarBatch(vectors);

      // For test purpose only
      // If the last external accumulator is `NumRowGroupsAccumulator`, the row group number to read
      // will be updated to the accumulator. So we can check if the row groups are filtered or not
      // in test case.
      // Note that this tries to get thread local TaskContext object, if this is called at other
      // thread, it won't update the accumulator.
      if (taskContext != null) {
        Option<AccumulatorV2<?, ?>> accu = getTaskAccumulator(taskContext.taskMetrics());
        if (accu.isDefined() && accu.get().getClass().getSimpleName().equals("NumRowGroupsAcc")) {
          @SuppressWarnings("unchecked")
          AccumulatorV2<Integer, Integer> intAccum = (AccumulatorV2<Integer, Integer>) accu.get();
          intAccum.add(blocks.size());
        }
      }

      int batchSize =
          conf.getInt(
              CometConf.COMET_BATCH_SIZE().key(),
              (Integer) CometConf.COMET_BATCH_SIZE().defaultValue().get());
      this.handle =
          Native.initRecordBatchReader(
              filePath,
              fileSize,
              starts,
              lengths,
              hasRowIndexColumn ? null : nativeFilter,
              serializedRequestedArrowSchema,
              serializedDataArrowSchema,
              timeZoneId,
              batchSize);
    }
    isInitialized = true;
  }