in spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala [2844:2884]
def columnarShuffleSupported(s: ShuffleExchangeExec): (Boolean, String) = {
val inputs = s.child.output
val partitioning = s.outputPartitioning
var msg = ""
val supported = partitioning match {
case HashPartitioning(expressions, _) =>
// columnar shuffle supports the same data types (including complex types) both for
// partition keys and for other columns
val supported =
expressions.map(QueryPlanSerde.exprToProto(_, inputs)).forall(_.isDefined) &&
expressions.forall(e => supportedShuffleDataType(e.dataType)) &&
inputs.forall(attr => supportedShuffleDataType(attr.dataType))
if (!supported) {
msg = s"unsupported Spark partitioning expressions: $expressions"
}
supported
case SinglePartition =>
inputs.forall(attr => supportedShuffleDataType(attr.dataType))
case RoundRobinPartitioning(_) =>
inputs.forall(attr => supportedShuffleDataType(attr.dataType))
case RangePartitioning(orderings, _) =>
val supported =
orderings.map(QueryPlanSerde.exprToProto(_, inputs)).forall(_.isDefined) &&
orderings.forall(e => supportedShuffleDataType(e.dataType)) &&
inputs.forall(attr => supportedShuffleDataType(attr.dataType))
if (!supported) {
msg = s"unsupported Spark partitioning expressions: $orderings"
}
supported
case _ =>
msg = s"unsupported Spark partitioning: ${partitioning.getClass.getName}"
false
}
if (!supported) {
emitWarning(msg)
(false, msg)
} else {
(true, null)
}
}