in backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/CHExecUtil.scala [256:381]
def genShuffleDependency(
rdd: RDD[ColumnarBatch],
childOutputAttributes: Seq[Attribute],
projectOutputAttributes: Seq[Attribute],
newPartitioning: Partitioning,
serializer: Serializer,
writeMetrics: Map[String, SQLMetric],
metrics: Map[String, SQLMetric]): ShuffleDependency[Int, ColumnarBatch, ColumnarBatch] = {
// scalastyle:on argcount
lazy val requiredFields = if (projectOutputAttributes != null) {
val outputFields = projectOutputAttributes.map {
a: Attribute =>
BindReferences
.bindReference(a, childOutputAttributes)
.asInstanceOf[BoundReference]
.ordinal
}
outputFields.mkString(",").getBytes()
} else {
Array.empty[Byte]
}
val nativePartitioning: NativePartitioning = newPartitioning match {
case SinglePartition =>
new NativePartitioning(
GlutenShuffleUtils.SinglePartitioningShortName,
1,
Array.empty[Byte],
Array.empty[Byte],
requiredFields)
case RoundRobinPartitioning(n) =>
new NativePartitioning(
GlutenShuffleUtils.RoundRobinPartitioningShortName,
n,
Array.empty[Byte],
Array.empty[Byte],
requiredFields)
case HashPartitioning(_, _) =>
buildHashPartitioning(
newPartitioning.asInstanceOf[HashPartitioning],
childOutputAttributes,
projectOutputAttributes)
case RangePartitioning(sortingExpressions, numPartitions) =>
val rddForSampling = buildRangePartitionSampleRDD(
rdd,
RangePartitioning(sortingExpressions, numPartitions),
childOutputAttributes)
// we let spark compute the range bounds here, and then pass to CH.
val orderingAttributes = sortingExpressions.zipWithIndex.map {
case (ord, i) =>
ord.copy(child = BoundReference(i, ord.dataType, ord.nullable))
}
implicit val ordering: LazilyGeneratedOrdering =
new LazilyGeneratedOrdering(orderingAttributes)
val generator = new RangePartitionerBoundsGenerator(
numPartitions,
rddForSampling,
sortingExpressions,
childOutputAttributes)
val rangeBoundsInfo = generator.getRangeBoundsJsonString
val attributePos = if (projectOutputAttributes != null) {
projectOutputAttributes.map(
attr =>
BindReferences
.bindReference(attr, childOutputAttributes)
.asInstanceOf[BoundReference]
.ordinal)
} else {
Seq[Int]()
}
new NativePartitioning(
GlutenShuffleUtils.RangePartitioningShortName,
rangeBoundsInfo.boundsSize + 1,
Array.empty[Byte],
rangeBoundsInfo.json.getBytes,
attributePos.mkString(",").getBytes
)
case p =>
throw new IllegalStateException(s"Unknow partition type: ${p.getClass.toString}")
}
val isRoundRobin = newPartitioning.isInstanceOf[RoundRobinPartitioning] &&
newPartitioning.numPartitions > 1
// RDD passed to ShuffleDependency should be the form of key-value pairs.
// ColumnarShuffleWriter will compute ids from ColumnarBatch on
// native side other than read the "key" part.
// Thus in Columnar Shuffle we never use the "key" part.
val isOrderSensitive = isRoundRobin && !SQLConf.get.sortBeforeRepartition
val rddWithPartitionKey: RDD[Product2[Int, ColumnarBatch]] =
if (
GlutenConfig.get.isUseColumnarShuffleManager
|| GlutenConfig.get.isUseCelebornShuffleManager
) {
newPartitioning match {
case _ =>
rdd.mapPartitionsWithIndexInternal(
(_, cbIter) => cbIter.map(cb => (0, cb)),
isOrderSensitive = isOrderSensitive)
}
} else {
val options = buildPartitioningOptions(nativePartitioning)
rdd.mapPartitionsWithIndexInternal(
(_, cbIter) => {
buildPartitionedBlockIterator(
cbIter,
options,
writeMetrics(SQLShuffleWriteMetricsReporter.SHUFFLE_RECORDS_WRITTEN))
},
isOrderSensitive = isOrderSensitive
)
}
val dependency =
new ColumnarShuffleDependency[Int, ColumnarBatch, ColumnarBatch](
rddWithPartitionKey,
new PartitionIdPassthrough(nativePartitioning.getNumPartitions),
serializer,
shuffleWriterProcessor = ShuffleExchangeExec.createShuffleWriteProcessor(writeMetrics),
nativePartitioning = nativePartitioning,
metrics = metrics
)
dependency
}