in spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala [74:126]
private def isDynamicPruningFilter(e: Expression): Boolean =
e.exists(_.isInstanceOf[PlanExpression[_]])
private def transformV1Scan(scanExec: FileSourceScanExec): SparkPlan = {
if (COMET_DPP_FALLBACK_ENABLED.get() &&
scanExec.partitionFilters.exists(isDynamicPruningFilter)) {
return withInfo(scanExec, "Dynamic Partition Pruning is not supported")
}
scanExec.relation match {
case r: HadoopFsRelation =>
val fallbackReasons = new ListBuffer[String]()
if (!CometScanExec.isFileFormatSupported(r.fileFormat)) {
fallbackReasons += s"Unsupported file format ${r.fileFormat}"
return withInfo(scanExec, fallbackReasons.mkString(", "))
}
val scanImpl = COMET_NATIVE_SCAN_IMPL.get()
if (scanImpl == CometConf.SCAN_NATIVE_DATAFUSION && !COMET_EXEC_ENABLED.get()) {
fallbackReasons +=
s"Full native scan disabled because ${COMET_EXEC_ENABLED.key} disabled"
return withInfo(scanExec, fallbackReasons.mkString(", "))
}
val (schemaSupported, partitionSchemaSupported) = scanImpl match {
case CometConf.SCAN_NATIVE_DATAFUSION =>
(
CometNativeScanExec.isSchemaSupported(scanExec.requiredSchema, fallbackReasons),
CometNativeScanExec.isSchemaSupported(r.partitionSchema, fallbackReasons))
case CometConf.SCAN_NATIVE_COMET | SCAN_NATIVE_ICEBERG_COMPAT =>
(
CometScanExec.isSchemaSupported(scanExec.requiredSchema, fallbackReasons),
CometScanExec.isSchemaSupported(r.partitionSchema, fallbackReasons))
}
if (!schemaSupported) {
fallbackReasons += s"Unsupported schema ${scanExec.requiredSchema} for $scanImpl"
}
if (!partitionSchemaSupported) {
fallbackReasons += s"Unsupported partitioning schema ${r.partitionSchema} for $scanImpl"
}
if (schemaSupported && partitionSchemaSupported) {
CometScanExec(scanExec, session)
} else {
withInfo(scanExec, fallbackReasons.mkString(", "))
}
case _ =>
withInfo(scanExec, s"Unsupported relation ${scanExec.relation}")
}
}