in backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/utils/MergeTreePartsPartitionsUtil.scala [566:682]
private def getMergeTreePartRange(
selectPartsFiles: Seq[AddMergeTreeParts],
snapshotId: String,
database: String,
tableName: String,
relativeTablePath: String,
absoluteTablePath: String,
tableSchema: StructType,
table: ClickHouseTableV2,
clickhouseTableConfigs: Map[String, String],
filterExprs: Seq[Expression],
output: Seq[Attribute],
sparkSession: SparkSession): Seq[MergeTreePartRange] = {
if (useDriverFilter(filterExprs, sparkSession)) {
val size_per_mark = selectPartsFiles.map(part => (part.size, part.marks)).unzip match {
case (l1, l2) => l1.sum / l2.sum
}
val extensionTableNode = ExtensionTableBuilder
.makeExtensionTable(
database,
tableName,
snapshotId,
relativeTablePath,
absoluteTablePath,
table.orderByKey,
table.lowCardKey,
table.minmaxIndexKey,
table.bfIndexKey,
table.setIndexKey,
table.primaryKey,
PartSerializer.fromAddMergeTreeParts(selectPartsFiles),
tableSchema,
clickhouseTableConfigs.asJava,
new JArrayList[String]()
)
val transformer = filterExprs
.map {
case ar: AttributeReference if ar.dataType == BooleanType =>
EqualNullSafe(ar, Literal.TrueLiteral)
case e => e
}
.reduceLeftOption(And)
.map(ExpressionConverter.replaceWithExpressionTransformer(_, output))
val typeNodes = ConverterUtils.collectAttributeTypeNodes(output)
val nameList = ConverterUtils.collectAttributeNamesWithoutExprId(output)
val columnTypeNodes = output.map {
attr =>
if (table.partitionColumns.exists(_.equals(attr.name))) {
new ColumnTypeNode(NamedStruct.ColumnType.PARTITION_COL)
} else {
new ColumnTypeNode(NamedStruct.ColumnType.NORMAL_COL)
}
}.asJava
val substraitContext = new SubstraitContext
val enhancement =
Any.pack(StringValue.newBuilder.setValue(extensionTableNode.getExtensionTableStr).build)
val extensionNode = ExtensionBuilder.makeAdvancedExtension(enhancement)
val readNode = RelBuilder.makeReadRel(
typeNodes,
nameList,
columnTypeNodes,
transformer.map(_.doTransform(substraitContext)).orNull,
extensionNode,
substraitContext,
substraitContext.nextOperatorId("readRel")
)
val planBuilder = Plan.newBuilder
substraitContext.registeredFunction.forEach(
(k, v) => planBuilder.addExtensions(ExtensionBuilder.makeFunctionMapping(k, v).toProtobuf))
val filter_ranges = CHDatasourceJniWrapper.filterRangesOnDriver(
planBuilder.build().toByteArray,
readNode.toProtobuf.toByteArray
)
val mapper: ObjectMapper = new ObjectMapper()
val values: JArrayList[MergeTreePartFilterReturnedRange] =
mapper.readValue(
filter_ranges,
new TypeReference[JArrayList[MergeTreePartFilterReturnedRange]]() {})
val partMap = selectPartsFiles.map(part => (part.name, part)).toMap
values.asScala
.map(
range => {
val part = partMap.get(range.getPartName).orNull
val marks = range.getEnd - range.getBegin
MergeTreePartRange(
part.name,
part.dirName,
part.targetNode,
part.bucketNum,
range.getBegin,
marks,
marks * size_per_mark)
})
.toSeq
} else {
selectPartsFiles
.map(
part =>
MergeTreePartRange(
part.name,
part.dirName,
part.targetNode,
part.bucketNum,
0,
part.marks,
part.size))
.toSeq
}
}