override def supportBatch()

in spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala [73:216]


  override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = true

  override def buildReaderWithPartitionValues(
      sparkSession: SparkSession,
      dataSchema: StructType,
      partitionSchema: StructType,
      requiredSchema: StructType,
      filters: Seq[Filter],
      options: Map[String, String],
      hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
    val sqlConf = sparkSession.sessionState.conf
    CometParquetFileFormat.populateConf(sqlConf, hadoopConf)
    val broadcastedHadoopConf =
      sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))

    val isCaseSensitive = sqlConf.caseSensitiveAnalysis
    val useFieldId = CometParquetUtils.readFieldId(sqlConf)
    val ignoreMissingIds = CometParquetUtils.ignoreMissingIds(sqlConf)
    val pushDownDate = sqlConf.parquetFilterPushDownDate
    val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp
    val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal
    val pushDownStringPredicate = sqlConf.parquetFilterPushDownStringPredicate
    val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold
    val optionsMap = CaseInsensitiveMap[String](options)
    val parquetOptions = new ParquetOptions(optionsMap, sqlConf)
    val datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead
    val parquetFilterPushDown = sqlConf.parquetFilterPushDown

    // Comet specific configurations
    val capacity = CometConf.COMET_BATCH_SIZE.get(sqlConf)
    val nativeIcebergCompat =
      CometConf.COMET_NATIVE_SCAN_IMPL.get(sqlConf).equals(CometConf.SCAN_NATIVE_ICEBERG_COMPAT)

    (file: PartitionedFile) => {
      val sharedConf = broadcastedHadoopConf.value.value
      val footer = FooterReader.readFooter(sharedConf, file)
      val footerFileMetaData = footer.getFileMetaData
      val datetimeRebaseSpec = CometParquetFileFormat.getDatetimeRebaseSpec(
        file,
        requiredSchema,
        sharedConf,
        footerFileMetaData,
        datetimeRebaseModeInRead)

      val parquetSchema = footerFileMetaData.getSchema
      val parquetFilters = new ParquetFilters(
        parquetSchema,
        dataSchema,
        pushDownDate,
        pushDownTimestamp,
        pushDownDecimal,
        pushDownStringPredicate,
        pushDownInFilterThreshold,
        isCaseSensitive,
        datetimeRebaseSpec)

      val recordBatchReader =
        if (nativeIcebergCompat) {
          // We still need the predicate in the conf to allow us to generate row indexes based on
          // the actual row groups read
          val pushed = if (parquetFilterPushDown) {
            filters
              // Collects all converted Parquet filter predicates. Notice that not all predicates
              // can be converted (`ParquetFilters.createFilter` returns an `Option`). That's why
              // a `flatMap` is used here.
              .flatMap(parquetFilters.createFilter)
              .reduceOption(FilterApi.and)
          } else {
            None
          }
          pushed.foreach(p => ParquetInputFormat.setFilterPredicate(sharedConf, p))
          val pushedNative = if (parquetFilterPushDown) {
            parquetFilters.createNativeFilters(filters)
          } else {
            None
          }
          val batchReader = new NativeBatchReader(
            sharedConf,
            file,
            footer,
            pushedNative.orNull,
            capacity,
            requiredSchema,
            dataSchema,
            isCaseSensitive,
            useFieldId,
            ignoreMissingIds,
            datetimeRebaseSpec.mode == CORRECTED,
            partitionSchema,
            file.partitionValues,
            JavaConverters.mapAsJavaMap(metrics))
          try {
            batchReader.init()
          } catch {
            case e: Throwable =>
              batchReader.close()
              throw e
          }
          batchReader
        } else {
          val pushed = if (parquetFilterPushDown) {
            filters
              // Collects all converted Parquet filter predicates. Notice that not all predicates
              // can be converted (`ParquetFilters.createFilter` returns an `Option`). That's why
              // a `flatMap` is used here.
              .flatMap(parquetFilters.createFilter)
              .reduceOption(FilterApi.and)
          } else {
            None
          }
          pushed.foreach(p => ParquetInputFormat.setFilterPredicate(sharedConf, p))

          val batchReader = new BatchReader(
            sharedConf,
            file,
            footer,
            capacity,
            requiredSchema,
            isCaseSensitive,
            useFieldId,
            ignoreMissingIds,
            datetimeRebaseSpec.mode == CORRECTED,
            partitionSchema,
            file.partitionValues,
            JavaConverters.mapAsJavaMap(metrics))
          try {
            batchReader.init()
          } catch {
            case e: Throwable =>
              batchReader.close()
              throw e
          }
          batchReader
        }
      val iter = new RecordReaderIterator(recordBatchReader)
      try {
        iter.asInstanceOf[Iterator[InternalRow]]
      } catch {
        case e: Throwable =>
          iter.close()
          throw e
      }
    }
  }