def genShuffleDependency()

in backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/CHExecUtil.scala [256:381]


  def genShuffleDependency(
      rdd: RDD[ColumnarBatch],
      childOutputAttributes: Seq[Attribute],
      projectOutputAttributes: Seq[Attribute],
      newPartitioning: Partitioning,
      serializer: Serializer,
      writeMetrics: Map[String, SQLMetric],
      metrics: Map[String, SQLMetric]): ShuffleDependency[Int, ColumnarBatch, ColumnarBatch] = {
    // scalastyle:on argcount
    lazy val requiredFields = if (projectOutputAttributes != null) {
      val outputFields = projectOutputAttributes.map {
        a: Attribute =>
          BindReferences
            .bindReference(a, childOutputAttributes)
            .asInstanceOf[BoundReference]
            .ordinal
      }
      outputFields.mkString(",").getBytes()
    } else {
      Array.empty[Byte]
    }
    val nativePartitioning: NativePartitioning = newPartitioning match {
      case SinglePartition =>
        new NativePartitioning(
          GlutenShuffleUtils.SinglePartitioningShortName,
          1,
          Array.empty[Byte],
          Array.empty[Byte],
          requiredFields)
      case RoundRobinPartitioning(n) =>
        new NativePartitioning(
          GlutenShuffleUtils.RoundRobinPartitioningShortName,
          n,
          Array.empty[Byte],
          Array.empty[Byte],
          requiredFields)
      case HashPartitioning(_, _) =>
        buildHashPartitioning(
          newPartitioning.asInstanceOf[HashPartitioning],
          childOutputAttributes,
          projectOutputAttributes)
      case RangePartitioning(sortingExpressions, numPartitions) =>
        val rddForSampling = buildRangePartitionSampleRDD(
          rdd,
          RangePartitioning(sortingExpressions, numPartitions),
          childOutputAttributes)

        // we let spark compute the range bounds here, and then pass to CH.
        val orderingAttributes = sortingExpressions.zipWithIndex.map {
          case (ord, i) =>
            ord.copy(child = BoundReference(i, ord.dataType, ord.nullable))
        }
        implicit val ordering: LazilyGeneratedOrdering =
          new LazilyGeneratedOrdering(orderingAttributes)
        val generator = new RangePartitionerBoundsGenerator(
          numPartitions,
          rddForSampling,
          sortingExpressions,
          childOutputAttributes)
        val rangeBoundsInfo = generator.getRangeBoundsJsonString
        val attributePos = if (projectOutputAttributes != null) {
          projectOutputAttributes.map(
            attr =>
              BindReferences
                .bindReference(attr, childOutputAttributes)
                .asInstanceOf[BoundReference]
                .ordinal)
        } else {
          Seq[Int]()
        }
        new NativePartitioning(
          GlutenShuffleUtils.RangePartitioningShortName,
          rangeBoundsInfo.boundsSize + 1,
          Array.empty[Byte],
          rangeBoundsInfo.json.getBytes,
          attributePos.mkString(",").getBytes
        )
      case p =>
        throw new IllegalStateException(s"Unknow partition type: ${p.getClass.toString}")
    }

    val isRoundRobin = newPartitioning.isInstanceOf[RoundRobinPartitioning] &&
      newPartitioning.numPartitions > 1

    // RDD passed to ShuffleDependency should be the form of key-value pairs.
    // ColumnarShuffleWriter will compute ids from ColumnarBatch on
    // native side other than read the "key" part.
    // Thus in Columnar Shuffle we never use the "key" part.
    val isOrderSensitive = isRoundRobin && !SQLConf.get.sortBeforeRepartition

    val rddWithPartitionKey: RDD[Product2[Int, ColumnarBatch]] =
      if (
        GlutenConfig.get.isUseColumnarShuffleManager
        || GlutenConfig.get.isUseCelebornShuffleManager
      ) {
        newPartitioning match {
          case _ =>
            rdd.mapPartitionsWithIndexInternal(
              (_, cbIter) => cbIter.map(cb => (0, cb)),
              isOrderSensitive = isOrderSensitive)
        }
      } else {
        val options = buildPartitioningOptions(nativePartitioning)
        rdd.mapPartitionsWithIndexInternal(
          (_, cbIter) => {
            buildPartitionedBlockIterator(
              cbIter,
              options,
              writeMetrics(SQLShuffleWriteMetricsReporter.SHUFFLE_RECORDS_WRITTEN))
          },
          isOrderSensitive = isOrderSensitive
        )
      }

    val dependency =
      new ColumnarShuffleDependency[Int, ColumnarBatch, ColumnarBatch](
        rddWithPartitionKey,
        new PartitionIdPassthrough(nativePartitioning.getNumPartitions),
        serializer,
        shuffleWriterProcessor = ShuffleExchangeExec.createShuffleWriteProcessor(writeMetrics),
        nativePartitioning = nativePartitioning,
        metrics = metrics
      )

    dependency
  }