def genInputPartitionSeq()

in backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/utils/MergeTreePartsPartitionsUtil.scala [139:304]


  def genInputPartitionSeq(
      relation: HadoopFsRelation,
      engine: String,
      database: String,
      tableName: String,
      snapshotId: String,
      relativeTablePath: String,
      absoluteTablePath: String,
      optionalBucketSet: Option[BitSet],
      selectedPartitions: Array[PartitionDirectory],
      tableSchema: StructType,
      partitions: ArrayBuffer[InputPartition],
      table: ClickHouseTableV2,
      clickhouseTableConfigs: Map[String, String],
      output: Seq[Attribute],
      filterExprs: Seq[Expression],
      sparkSession: SparkSession): Unit = {

    val bucketingEnabled = sparkSession.sessionState.conf.bucketingEnabled
    val shouldProcess: String => Boolean = optionalBucketSet match {
      case Some(bucketSet) if bucketingEnabled =>
        name =>
          // find bucket it in name pattern of:
          // "partition_col=1/00001/373c9386-92a4-44ef-baaf-a67e1530b602_0_006"
          name.split("/").dropRight(1).filterNot(_.contains("=")).map(_.toInt).forall(bucketSet.get)
      case _ =>
        _ => true
    }

    val selectPartsFiles = selectedPartitions
      .flatMap(
        partition =>
          partition.files.map(
            fs => {
              val path = fs.getPath.toUri.toString

              val ret = ClickhouseSnapshot.pathToAddMTPCache.getIfPresent(path)
              if (ret == null) {
                val keys = ClickhouseSnapshot.pathToAddMTPCache.asMap().keySet()
                val keySample = if (keys.isEmpty) {
                  "<empty>"
                } else {
                  keys.iterator().next()
                }
                throw new IllegalStateException(
                  "Can't find AddMergeTreeParts from cache pathToAddMTPCache for key: " +
                    path + ". This happens when too many new entries are added to " +
                    "pathToAddMTPCache during current query. " +
                    "Try rerun current query. Existing KeySample: " + keySample
                )
              }
              ret
            }))
      .filter(part => shouldProcess(part.name))
      .toSeq
    if (selectPartsFiles.isEmpty) {
      return
    }

    val selectRanges: Seq[MergeTreePartRange] =
      getMergeTreePartRange(
        selectPartsFiles,
        snapshotId,
        database,
        tableName,
        relativeTablePath,
        absoluteTablePath,
        tableSchema,
        table,
        clickhouseTableConfigs,
        filterExprs,
        output,
        sparkSession
      )

    if (selectRanges.isEmpty) {
      return
    }

    val maxSplitBytes = getMaxSplitBytes(sparkSession, selectRanges)
    val totalCores = SparkResourceUtil.getTotalCores(relation.sparkSession.sessionState.conf)
    val isAllSmallFiles = selectRanges.forall(_.size < maxSplitBytes)
    val fileCntThreshold = relation.sparkSession.sessionState.conf
      .getConfString(
        CHBackendSettings.GLUTEN_CLICKHOUSE_FILES_PER_PARTITION_THRESHOLD,
        CHBackendSettings.GLUTEN_CLICKHOUSE_FILES_PER_PARTITION_THRESHOLD_DEFAULT
      )
      .toInt
    val totalMarksThreshold = totalCores * fileCntThreshold
    if (fileCntThreshold > 0 && isAllSmallFiles && selectRanges.size <= totalMarksThreshold) {
      var fileCnt = math.round((selectRanges.size * 1.0) / totalCores).toInt
      if (fileCnt < 1) fileCnt = 1
      val splitFiles = selectRanges
        .map {
          part =>
            MergeTreePartSplit(
              part.name,
              part.dirName,
              part.targetNode,
              part.start,
              part.marks,
              part.size
            )
        }
      genInputPartitionSeqByFileCnt(
        engine,
        database,
        tableName,
        snapshotId,
        relativeTablePath,
        absoluteTablePath,
        tableSchema,
        partitions,
        table,
        clickhouseTableConfigs,
        splitFiles,
        fileCnt
      )
    } else {
      val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes
      val totalMarks = selectRanges.map(p => p.marks).sum
      val totalBytes = selectRanges.map(p => p.size).sum
      // maxSplitBytes / (total_Bytes / total_marks) + 1
      val markCntPerPartition = maxSplitBytes * totalMarks / totalBytes + 1

      logInfo(s"Planning scan with bin packing, max mark: $markCntPerPartition")
      val splitFiles = selectRanges
        .flatMap {
          part =>
            val end = part.marks + part.start
            (part.start until end by markCntPerPartition).map {
              offset =>
                val remaining = end - offset
                val size = if (remaining > markCntPerPartition) markCntPerPartition else remaining
                MergeTreePartSplit(
                  part.name,
                  part.dirName,
                  part.targetNode,
                  offset,
                  size,
                  size * part.size / part.marks)
            }
        }

      val (partNameWithLocation, locationDistinct) =
        calculatedLocationForSoftAffinity(splitFiles, relativeTablePath)

      genInputPartitionSeqBySplitFiles(
        engine,
        database,
        tableName,
        snapshotId,
        relativeTablePath,
        absoluteTablePath,
        tableSchema,
        partitions,
        table,
        clickhouseTableConfigs,
        splitFiles,
        openCostInBytes,
        maxSplitBytes,
        partNameWithLocation,
        locationDistinct
      )
    }
  }