def columnarShuffleSupported()

in spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala [2844:2884]


  def columnarShuffleSupported(s: ShuffleExchangeExec): (Boolean, String) = {
    val inputs = s.child.output
    val partitioning = s.outputPartitioning
    var msg = ""
    val supported = partitioning match {
      case HashPartitioning(expressions, _) =>
        // columnar shuffle supports the same data types (including complex types) both for
        // partition keys and for other columns
        val supported =
          expressions.map(QueryPlanSerde.exprToProto(_, inputs)).forall(_.isDefined) &&
            expressions.forall(e => supportedShuffleDataType(e.dataType)) &&
            inputs.forall(attr => supportedShuffleDataType(attr.dataType))
        if (!supported) {
          msg = s"unsupported Spark partitioning expressions: $expressions"
        }
        supported
      case SinglePartition =>
        inputs.forall(attr => supportedShuffleDataType(attr.dataType))
      case RoundRobinPartitioning(_) =>
        inputs.forall(attr => supportedShuffleDataType(attr.dataType))
      case RangePartitioning(orderings, _) =>
        val supported =
          orderings.map(QueryPlanSerde.exprToProto(_, inputs)).forall(_.isDefined) &&
            orderings.forall(e => supportedShuffleDataType(e.dataType)) &&
            inputs.forall(attr => supportedShuffleDataType(attr.dataType))
        if (!supported) {
          msg = s"unsupported Spark partitioning expressions: $orderings"
        }
        supported
      case _ =>
        msg = s"unsupported Spark partitioning: ${partitioning.getClass.getName}"
        false
    }

    if (!supported) {
      emitWarning(msg)
      (false, msg)
    } else {
      (true, null)
    }
  }