in backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/utils/MergeTreePartsPartitionsUtil.scala [439:555]
def genBucketedInputPartitionSeq(
engine: String,
database: String,
tableName: String,
snapshotId: String,
relativeTablePath: String,
absoluteTablePath: String,
bucketSpec: BucketSpec,
optionalBucketSet: Option[BitSet],
optionalNumCoalescedBuckets: Option[Int],
selectedPartitions: Array[PartitionDirectory],
tableSchema: StructType,
partitions: ArrayBuffer[InputPartition],
table: ClickHouseTableV2,
clickhouseTableConfigs: Map[String, String],
output: Seq[Attribute],
filterExprs: Seq[Expression],
sparkSession: SparkSession): Unit = {
val selectPartsFiles = selectedPartitions
.flatMap(
partition =>
partition.files.map(
fs => {
val path = fs.getPath.toString
val ret = ClickhouseSnapshot.pathToAddMTPCache.getIfPresent(path)
if (ret == null) {
val keys = ClickhouseSnapshot.pathToAddMTPCache.asMap().keySet()
val keySample = if (keys.isEmpty) {
"<empty>"
} else {
keys.iterator().next()
}
throw new IllegalStateException(
"Can't find AddMergeTreeParts from cache pathToAddMTPCache for key: " +
path + ". This happens when too many new entries are added to " +
"pathToAddMTPCache during current query. " +
"Try rerun current query. Existing KeySample: " + keySample)
}
ret
}))
.toSeq
if (selectPartsFiles.isEmpty) {
return
}
val selectRanges: Seq[MergeTreePartRange] =
getMergeTreePartRange(
selectPartsFiles,
snapshotId,
database,
tableName,
relativeTablePath,
absoluteTablePath,
tableSchema,
table,
clickhouseTableConfigs,
filterExprs,
output,
sparkSession
)
if (selectRanges.isEmpty) {
return
}
val bucketGroupParts = selectRanges.groupBy(p => Integer.parseInt(p.bucketNum))
val prunedFilesGroupedToBuckets = if (optionalBucketSet.isDefined) {
val bucketSet = optionalBucketSet.get
bucketGroupParts.filter(f => bucketSet.get(f._1))
} else {
bucketGroupParts
}
if (optionalNumCoalescedBuckets.isDefined) {
throw new UnsupportedOperationException(
"Currently CH backend can't support coalesced buckets.")
}
Seq.tabulate(bucketSpec.numBuckets) {
bucketId =>
val currBucketParts: Seq[MergeTreePartRange] =
prunedFilesGroupedToBuckets.getOrElse(bucketId, Seq.empty)
if (currBucketParts.nonEmpty) {
val currentFiles = currBucketParts.map {
part =>
MergeTreePartSplit(
part.name,
part.dirName,
part.targetNode,
part.start,
part.marks,
part.size)
}
val newPartition = GlutenMergeTreePartition(
partitions.size,
engine,
database,
tableName,
snapshotId,
relativeTablePath,
absoluteTablePath,
table.orderByKey,
table.lowCardKey,
table.minmaxIndexKey,
table.bfIndexKey,
table.setIndexKey,
table.primaryKey,
currentFiles.toArray,
tableSchema,
clickhouseTableConfigs
)
partitions += newPartition
}
}
}