private def getMergeTreePartRange()

in backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/utils/MergeTreePartsPartitionsUtil.scala [566:682]


  private def getMergeTreePartRange(
      selectPartsFiles: Seq[AddMergeTreeParts],
      snapshotId: String,
      database: String,
      tableName: String,
      relativeTablePath: String,
      absoluteTablePath: String,
      tableSchema: StructType,
      table: ClickHouseTableV2,
      clickhouseTableConfigs: Map[String, String],
      filterExprs: Seq[Expression],
      output: Seq[Attribute],
      sparkSession: SparkSession): Seq[MergeTreePartRange] = {

    if (useDriverFilter(filterExprs, sparkSession)) {
      val size_per_mark = selectPartsFiles.map(part => (part.size, part.marks)).unzip match {
        case (l1, l2) => l1.sum / l2.sum
      }

      val extensionTableNode = ExtensionTableBuilder
        .makeExtensionTable(
          database,
          tableName,
          snapshotId,
          relativeTablePath,
          absoluteTablePath,
          table.orderByKey,
          table.lowCardKey,
          table.minmaxIndexKey,
          table.bfIndexKey,
          table.setIndexKey,
          table.primaryKey,
          PartSerializer.fromAddMergeTreeParts(selectPartsFiles),
          tableSchema,
          clickhouseTableConfigs.asJava,
          new JArrayList[String]()
        )

      val transformer = filterExprs
        .map {
          case ar: AttributeReference if ar.dataType == BooleanType =>
            EqualNullSafe(ar, Literal.TrueLiteral)
          case e => e
        }
        .reduceLeftOption(And)
        .map(ExpressionConverter.replaceWithExpressionTransformer(_, output))

      val typeNodes = ConverterUtils.collectAttributeTypeNodes(output)
      val nameList = ConverterUtils.collectAttributeNamesWithoutExprId(output)
      val columnTypeNodes = output.map {
        attr =>
          if (table.partitionColumns.exists(_.equals(attr.name))) {
            new ColumnTypeNode(NamedStruct.ColumnType.PARTITION_COL)
          } else {
            new ColumnTypeNode(NamedStruct.ColumnType.NORMAL_COL)
          }
      }.asJava
      val substraitContext = new SubstraitContext
      val enhancement =
        Any.pack(StringValue.newBuilder.setValue(extensionTableNode.getExtensionTableStr).build)
      val extensionNode = ExtensionBuilder.makeAdvancedExtension(enhancement)
      val readNode = RelBuilder.makeReadRel(
        typeNodes,
        nameList,
        columnTypeNodes,
        transformer.map(_.doTransform(substraitContext)).orNull,
        extensionNode,
        substraitContext,
        substraitContext.nextOperatorId("readRel")
      )

      val planBuilder = Plan.newBuilder
      substraitContext.registeredFunction.forEach(
        (k, v) => planBuilder.addExtensions(ExtensionBuilder.makeFunctionMapping(k, v).toProtobuf))

      val filter_ranges = CHDatasourceJniWrapper.filterRangesOnDriver(
        planBuilder.build().toByteArray,
        readNode.toProtobuf.toByteArray
      )

      val mapper: ObjectMapper = new ObjectMapper()
      val values: JArrayList[MergeTreePartFilterReturnedRange] =
        mapper.readValue(
          filter_ranges,
          new TypeReference[JArrayList[MergeTreePartFilterReturnedRange]]() {})

      val partMap = selectPartsFiles.map(part => (part.name, part)).toMap
      values.asScala
        .map(
          range => {
            val part = partMap.get(range.getPartName).orNull
            val marks = range.getEnd - range.getBegin
            MergeTreePartRange(
              part.name,
              part.dirName,
              part.targetNode,
              part.bucketNum,
              range.getBegin,
              marks,
              marks * size_per_mark)
          })
        .toSeq
    } else {
      selectPartsFiles
        .map(
          part =>
            MergeTreePartRange(
              part.name,
              part.dirName,
              part.targetNode,
              part.bucketNum,
              0,
              part.marks,
              part.size))
        .toSeq
    }
  }