in spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala [128:191]
private def transformV2Scan(scanExec: BatchScanExec): SparkPlan = {
scanExec.scan match {
case scan: ParquetScan =>
val fallbackReasons = new ListBuffer[String]()
val schemaSupported =
CometBatchScanExec.isSchemaSupported(scan.readDataSchema, fallbackReasons)
if (!schemaSupported) {
fallbackReasons += s"Schema ${scan.readDataSchema} is not supported"
}
val partitionSchemaSupported =
CometBatchScanExec.isSchemaSupported(scan.readPartitionSchema, fallbackReasons)
if (!partitionSchemaSupported) {
fallbackReasons += s"Partition schema ${scan.readPartitionSchema} is not supported"
}
if (scan.pushedAggregate.nonEmpty) {
fallbackReasons += "Comet does not support pushed aggregate"
}
if (schemaSupported && partitionSchemaSupported && scan.pushedAggregate.isEmpty) {
val cometScan = CometParquetScan(scanExec.scan.asInstanceOf[ParquetScan])
CometBatchScanExec(
scanExec.copy(scan = cometScan),
runtimeFilters = scanExec.runtimeFilters)
} else {
withInfo(scanExec, fallbackReasons.mkString(", "))
}
// Iceberg scan
case s: SupportsComet =>
val fallbackReasons = new ListBuffer[String]()
if (!s.isCometEnabled) {
fallbackReasons += "Comet extension is not enabled for " +
s"${scanExec.scan.getClass.getSimpleName}: not enabled on data source side"
}
val schemaSupported =
CometBatchScanExec.isSchemaSupported(scanExec.scan.readSchema(), fallbackReasons)
if (!schemaSupported) {
fallbackReasons += "Comet extension is not enabled for " +
s"${scanExec.scan.getClass.getSimpleName}: Schema not supported"
}
if (s.isCometEnabled && schemaSupported) {
// When reading from Iceberg, we automatically enable type promotion
SQLConf.get.setConfString(COMET_SCHEMA_EVOLUTION_ENABLED.key, "true")
CometBatchScanExec(
scanExec.clone().asInstanceOf[BatchScanExec],
runtimeFilters = scanExec.runtimeFilters)
} else {
withInfo(scanExec, fallbackReasons.mkString(", "))
}
case other =>
withInfo(
scanExec,
s"Unsupported scan: ${other.getClass.getName}. " +
"Comet Scan only supports Parquet and Iceberg Parquet file formats")
}
}