def genBucketedInputPartitionSeq()

in backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/utils/MergeTreePartsPartitionsUtil.scala [439:555]


  def genBucketedInputPartitionSeq(
      engine: String,
      database: String,
      tableName: String,
      snapshotId: String,
      relativeTablePath: String,
      absoluteTablePath: String,
      bucketSpec: BucketSpec,
      optionalBucketSet: Option[BitSet],
      optionalNumCoalescedBuckets: Option[Int],
      selectedPartitions: Array[PartitionDirectory],
      tableSchema: StructType,
      partitions: ArrayBuffer[InputPartition],
      table: ClickHouseTableV2,
      clickhouseTableConfigs: Map[String, String],
      output: Seq[Attribute],
      filterExprs: Seq[Expression],
      sparkSession: SparkSession): Unit = {

    val selectPartsFiles = selectedPartitions
      .flatMap(
        partition =>
          partition.files.map(
            fs => {
              val path = fs.getPath.toString
              val ret = ClickhouseSnapshot.pathToAddMTPCache.getIfPresent(path)
              if (ret == null) {
                val keys = ClickhouseSnapshot.pathToAddMTPCache.asMap().keySet()
                val keySample = if (keys.isEmpty) {
                  "<empty>"
                } else {
                  keys.iterator().next()
                }
                throw new IllegalStateException(
                  "Can't find AddMergeTreeParts from cache pathToAddMTPCache for key: " +
                    path + ". This happens when too many new entries are added to " +
                    "pathToAddMTPCache during current query. " +
                    "Try rerun current query. Existing KeySample: " + keySample)
              }
              ret
            }))
      .toSeq

    if (selectPartsFiles.isEmpty) {
      return
    }

    val selectRanges: Seq[MergeTreePartRange] =
      getMergeTreePartRange(
        selectPartsFiles,
        snapshotId,
        database,
        tableName,
        relativeTablePath,
        absoluteTablePath,
        tableSchema,
        table,
        clickhouseTableConfigs,
        filterExprs,
        output,
        sparkSession
      )

    if (selectRanges.isEmpty) {
      return
    }

    val bucketGroupParts = selectRanges.groupBy(p => Integer.parseInt(p.bucketNum))

    val prunedFilesGroupedToBuckets = if (optionalBucketSet.isDefined) {
      val bucketSet = optionalBucketSet.get
      bucketGroupParts.filter(f => bucketSet.get(f._1))
    } else {
      bucketGroupParts
    }

    if (optionalNumCoalescedBuckets.isDefined) {
      throw new UnsupportedOperationException(
        "Currently CH backend can't support coalesced buckets.")
    }
    Seq.tabulate(bucketSpec.numBuckets) {
      bucketId =>
        val currBucketParts: Seq[MergeTreePartRange] =
          prunedFilesGroupedToBuckets.getOrElse(bucketId, Seq.empty)
        if (currBucketParts.nonEmpty) {
          val currentFiles = currBucketParts.map {
            part =>
              MergeTreePartSplit(
                part.name,
                part.dirName,
                part.targetNode,
                part.start,
                part.marks,
                part.size)
          }
          val newPartition = GlutenMergeTreePartition(
            partitions.size,
            engine,
            database,
            tableName,
            snapshotId,
            relativeTablePath,
            absoluteTablePath,
            table.orderByKey,
            table.lowCardKey,
            table.minmaxIndexKey,
            table.bfIndexKey,
            table.setIndexKey,
            table.primaryKey,
            currentFiles.toArray,
            tableSchema,
            clickhouseTableConfigs
          )
          partitions += newPartition
        }
    }
  }