public void init()

in common/src/main/java/org/apache/comet/parquet/BatchReader.java [231:378]


  public void init() throws URISyntaxException, IOException {
    useDecimal128 =
        conf.getBoolean(
            CometConf.COMET_USE_DECIMAL_128().key(),
            (Boolean) CometConf.COMET_USE_DECIMAL_128().defaultValue().get());
    useLazyMaterialization =
        conf.getBoolean(
            CometConf.COMET_USE_LAZY_MATERIALIZATION().key(),
            (Boolean) CometConf.COMET_USE_LAZY_MATERIALIZATION().defaultValue().get());

    long start = file.start();
    long length = file.length();
    String filePath = file.filePath().toString();

    ParquetReadOptions.Builder builder = HadoopReadOptions.builder(conf, new Path(filePath));

    if (start >= 0 && length >= 0) {
      builder = builder.withRange(start, start + length);
    }
    ParquetReadOptions readOptions = builder.build();

    // TODO: enable off-heap buffer when they are ready
    ReadOptions cometReadOptions = ReadOptions.builder(conf).build();

    Path path = new Path(new URI(filePath));
    fileReader =
        new FileReader(
            CometInputFile.fromPath(path, conf), footer, readOptions, cometReadOptions, metrics);
    requestedSchema = fileReader.getFileMetaData().getSchema();
    MessageType fileSchema = requestedSchema;

    if (sparkSchema == null) {
      sparkSchema = new ParquetToSparkSchemaConverter(conf).convert(requestedSchema);
    } else {
      requestedSchema =
          CometParquetReadSupport.clipParquetSchema(
              requestedSchema, sparkSchema, isCaseSensitive, useFieldId, ignoreMissingIds);
      if (requestedSchema.getFieldCount() != sparkSchema.size()) {
        throw new IllegalArgumentException(
            String.format(
                "Spark schema has %d columns while " + "Parquet schema has %d columns",
                sparkSchema.size(), requestedSchema.getColumns().size()));
      }
    }

    totalRowCount = fileReader.getRecordCount();
    List<ColumnDescriptor> columns = requestedSchema.getColumns();
    int numColumns = columns.size();
    if (partitionSchema != null) numColumns += partitionSchema.size();
    columnReaders = new AbstractColumnReader[numColumns];

    // Initialize missing columns and use null vectors for them
    missingColumns = new boolean[columns.size()];
    List<String[]> paths = requestedSchema.getPaths();
    // We do not need the column index of the row index; but this method has the
    // side effect of throwing an exception if a column with the same name is
    // found which we do want (spark unit tests explicitly test for that).
    ShimFileFormat.findRowIndexColumnIndexInSchema(sparkSchema);
    StructField[] nonPartitionFields = sparkSchema.fields();
    for (int i = 0; i < requestedSchema.getFieldCount(); i++) {
      Type t = requestedSchema.getFields().get(i);
      Preconditions.checkState(
          t.isPrimitive() && !t.isRepetition(Type.Repetition.REPEATED),
          "Complex type is not supported");
      String[] colPath = paths.get(i);
      if (nonPartitionFields[i].name().equals(ShimFileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME())) {
        // Values of ROW_INDEX_TEMPORARY_COLUMN_NAME column are always populated with
        // generated row indexes, rather than read from the file.
        // TODO(SPARK-40059): Allow users to include columns named
        //                    FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME in their schemas.
        long[] rowIndices = fileReader.getRowIndices();
        columnReaders[i] = new RowIndexColumnReader(nonPartitionFields[i], capacity, rowIndices);
        missingColumns[i] = true;
      } else if (fileSchema.containsPath(colPath)) {
        ColumnDescriptor fd = fileSchema.getColumnDescription(colPath);
        if (!fd.equals(columns.get(i))) {
          throw new UnsupportedOperationException("Schema evolution is not supported");
        }
        missingColumns[i] = false;
      } else {
        if (columns.get(i).getMaxDefinitionLevel() == 0) {
          throw new IOException(
              "Required column '"
                  + Arrays.toString(colPath)
                  + "' is missing"
                  + " in data file "
                  + filePath);
        }
        ConstantColumnReader reader =
            new ConstantColumnReader(nonPartitionFields[i], capacity, useDecimal128);
        columnReaders[i] = reader;
        missingColumns[i] = true;
      }
    }

    // Initialize constant readers for partition columns
    if (partitionSchema != null) {
      StructField[] partitionFields = partitionSchema.fields();
      for (int i = columns.size(); i < columnReaders.length; i++) {
        int fieldIndex = i - columns.size();
        StructField field = partitionFields[fieldIndex];
        ConstantColumnReader reader =
            new ConstantColumnReader(field, capacity, partitionValues, fieldIndex, useDecimal128);
        columnReaders[i] = reader;
      }
    }

    vectors = new CometVector[numColumns];
    currentBatch = new ColumnarBatch(vectors);
    fileReader.setRequestedSchema(requestedSchema.getColumns());

    // For test purpose only
    // If the last external accumulator is `NumRowGroupsAccumulator`, the row group number to read
    // will be updated to the accumulator. So we can check if the row groups are filtered or not
    // in test case.
    // Note that this tries to get thread local TaskContext object, if this is called at other
    // thread, it won't update the accumulator.
    if (taskContext != null) {
      Option<AccumulatorV2<?, ?>> accu =
          ShimTaskMetrics.getTaskAccumulator(taskContext.taskMetrics());
      if (accu.isDefined() && accu.get().getClass().getSimpleName().equals("NumRowGroupsAcc")) {
        @SuppressWarnings("unchecked")
        AccumulatorV2<Integer, Integer> intAccum = (AccumulatorV2<Integer, Integer>) accu.get();
        intAccum.add(fileReader.getRowGroups().size());
      }
    }

    // Pre-fetching
    boolean preFetchEnabled =
        conf.getBoolean(
            CometConf.COMET_SCAN_PREFETCH_ENABLED().key(),
            (boolean) CometConf.COMET_SCAN_PREFETCH_ENABLED().defaultValue().get());

    if (preFetchEnabled) {
      LOG.info("Prefetch enabled for BatchReader.");
      this.prefetchQueue = new LinkedBlockingQueue<>();
    }

    isInitialized = true;
    synchronized (this) {
      // if prefetch is enabled, `init()` is called in separate thread. When
      // `BatchReader.nextBatch()` is called asynchronously, it is possibly that
      // `init()` is not called or finished. We need to hold on `nextBatch` until
      // initialization of `BatchReader` is done. Once we are close to finish
      // initialization, we notify the waiting thread of `nextBatch` to continue.
      notifyAll();
    }
  }