in backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/utils/MergeTreePartsPartitionsUtil.scala [139:304]
def genInputPartitionSeq(
relation: HadoopFsRelation,
engine: String,
database: String,
tableName: String,
snapshotId: String,
relativeTablePath: String,
absoluteTablePath: String,
optionalBucketSet: Option[BitSet],
selectedPartitions: Array[PartitionDirectory],
tableSchema: StructType,
partitions: ArrayBuffer[InputPartition],
table: ClickHouseTableV2,
clickhouseTableConfigs: Map[String, String],
output: Seq[Attribute],
filterExprs: Seq[Expression],
sparkSession: SparkSession): Unit = {
val bucketingEnabled = sparkSession.sessionState.conf.bucketingEnabled
val shouldProcess: String => Boolean = optionalBucketSet match {
case Some(bucketSet) if bucketingEnabled =>
name =>
// find bucket it in name pattern of:
// "partition_col=1/00001/373c9386-92a4-44ef-baaf-a67e1530b602_0_006"
name.split("/").dropRight(1).filterNot(_.contains("=")).map(_.toInt).forall(bucketSet.get)
case _ =>
_ => true
}
val selectPartsFiles = selectedPartitions
.flatMap(
partition =>
partition.files.map(
fs => {
val path = fs.getPath.toUri.toString
val ret = ClickhouseSnapshot.pathToAddMTPCache.getIfPresent(path)
if (ret == null) {
val keys = ClickhouseSnapshot.pathToAddMTPCache.asMap().keySet()
val keySample = if (keys.isEmpty) {
"<empty>"
} else {
keys.iterator().next()
}
throw new IllegalStateException(
"Can't find AddMergeTreeParts from cache pathToAddMTPCache for key: " +
path + ". This happens when too many new entries are added to " +
"pathToAddMTPCache during current query. " +
"Try rerun current query. Existing KeySample: " + keySample
)
}
ret
}))
.filter(part => shouldProcess(part.name))
.toSeq
if (selectPartsFiles.isEmpty) {
return
}
val selectRanges: Seq[MergeTreePartRange] =
getMergeTreePartRange(
selectPartsFiles,
snapshotId,
database,
tableName,
relativeTablePath,
absoluteTablePath,
tableSchema,
table,
clickhouseTableConfigs,
filterExprs,
output,
sparkSession
)
if (selectRanges.isEmpty) {
return
}
val maxSplitBytes = getMaxSplitBytes(sparkSession, selectRanges)
val totalCores = SparkResourceUtil.getTotalCores(relation.sparkSession.sessionState.conf)
val isAllSmallFiles = selectRanges.forall(_.size < maxSplitBytes)
val fileCntThreshold = relation.sparkSession.sessionState.conf
.getConfString(
CHBackendSettings.GLUTEN_CLICKHOUSE_FILES_PER_PARTITION_THRESHOLD,
CHBackendSettings.GLUTEN_CLICKHOUSE_FILES_PER_PARTITION_THRESHOLD_DEFAULT
)
.toInt
val totalMarksThreshold = totalCores * fileCntThreshold
if (fileCntThreshold > 0 && isAllSmallFiles && selectRanges.size <= totalMarksThreshold) {
var fileCnt = math.round((selectRanges.size * 1.0) / totalCores).toInt
if (fileCnt < 1) fileCnt = 1
val splitFiles = selectRanges
.map {
part =>
MergeTreePartSplit(
part.name,
part.dirName,
part.targetNode,
part.start,
part.marks,
part.size
)
}
genInputPartitionSeqByFileCnt(
engine,
database,
tableName,
snapshotId,
relativeTablePath,
absoluteTablePath,
tableSchema,
partitions,
table,
clickhouseTableConfigs,
splitFiles,
fileCnt
)
} else {
val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes
val totalMarks = selectRanges.map(p => p.marks).sum
val totalBytes = selectRanges.map(p => p.size).sum
// maxSplitBytes / (total_Bytes / total_marks) + 1
val markCntPerPartition = maxSplitBytes * totalMarks / totalBytes + 1
logInfo(s"Planning scan with bin packing, max mark: $markCntPerPartition")
val splitFiles = selectRanges
.flatMap {
part =>
val end = part.marks + part.start
(part.start until end by markCntPerPartition).map {
offset =>
val remaining = end - offset
val size = if (remaining > markCntPerPartition) markCntPerPartition else remaining
MergeTreePartSplit(
part.name,
part.dirName,
part.targetNode,
offset,
size,
size * part.size / part.marks)
}
}
val (partNameWithLocation, locationDistinct) =
calculatedLocationForSoftAffinity(splitFiles, relativeTablePath)
genInputPartitionSeqBySplitFiles(
engine,
database,
tableName,
snapshotId,
relativeTablePath,
absoluteTablePath,
tableSchema,
partitions,
table,
clickhouseTableConfigs,
splitFiles,
openCostInBytes,
maxSplitBytes,
partNameWithLocation,
locationDistinct
)
}
}