in spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala [193:227]
def shouldApplySparkToColumnar(conf: SQLConf, op: SparkPlan): Boolean = {
// Only consider converting leaf nodes to columnar currently, so that all the following
// operators can have a chance to be converted to columnar. Leaf operators that output
// columnar batches, such as Spark's vectorized readers, will also be converted to native
// comet batches.
val fallbackReasons = new ListBuffer[String]()
if (CometSparkToColumnarExec.isSchemaSupported(op.schema, fallbackReasons)) {
op match {
// Convert Spark DS v1 scan to Arrow format
case scan: FileSourceScanExec =>
scan.relation.fileFormat match {
case _: CSVFileFormat => CometConf.COMET_CONVERT_FROM_CSV_ENABLED.get(conf)
case _: JsonFileFormat => CometConf.COMET_CONVERT_FROM_JSON_ENABLED.get(conf)
case _: ParquetFileFormat => CometConf.COMET_CONVERT_FROM_PARQUET_ENABLED.get(conf)
case _ => isSparkToArrowEnabled(conf, op)
}
// Convert Spark DS v2 scan to Arrow format
case scan: BatchScanExec =>
scan.scan match {
case _: CSVScan => CometConf.COMET_CONVERT_FROM_CSV_ENABLED.get(conf)
case _: JsonScan => CometConf.COMET_CONVERT_FROM_JSON_ENABLED.get(conf)
case _: ParquetScan => CometConf.COMET_CONVERT_FROM_PARQUET_ENABLED.get(conf)
case _ => isSparkToArrowEnabled(conf, op)
}
// other leaf nodes
case _: LeafExecNode =>
isSparkToArrowEnabled(conf, op)
case _ =>
// TODO: consider converting other intermediate operators to columnar.
false
}
} else {
false
}
}