def scanMergeTreePartsToAddFile()

in backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v2/clickhouse/utils/ScanMergeTreePartsUtils.scala [33:157]


  def scanMergeTreePartsToAddFile(
      configuration: Configuration,
      clickHouseTableV2: ClickHouseTableV2,
      pathFilter: String,
      isPartition: Boolean,
      isBucketTable: Boolean): Seq[AddFile] = {
    // scan parts dir
    val scanPath = new Path(clickHouseTableV2.path + pathFilter)
    val fs = scanPath.getFileSystem(configuration)
    val fileGlobStatuses = fs.globStatus(scanPath)
    val allDirSummary = fileGlobStatuses
      .filter(_.isDirectory)
      .map(
        p => {
          logInfo(s"scan merge tree parts: ${p.getPath.toString}")
          val filePath = p.getPath
          val sum = fs.getContentSummary(filePath)
          val pathName = filePath.getName
          val pathNameArr = pathName.split("_")
          val (
            childFilePath,
            partitionId,
            bucketNum,
            minBlockNum,
            maxBlockNum,
            level,
            partitionValues) =
            if (pathNameArr.length == 4) {
              if (isPartition) {
                val partitionPath = filePath.getParent.getName
                val partitionValues = PartitioningUtils
                  .parsePathFragmentAsSeq(partitionPath)
                  .toMap[String, String]
                (
                  partitionPath + "/" + pathName,
                  pathNameArr(0),
                  "",
                  pathNameArr(1).toLong,
                  pathNameArr(2).toLong,
                  pathNameArr(3).toInt,
                  partitionValues
                )
              } else if (isBucketTable) {
                val bucketPath = filePath.getParent.getName
                (
                  bucketPath + "/" + pathName,
                  pathNameArr(0),
                  bucketPath,
                  pathNameArr(1).toLong,
                  pathNameArr(2).toLong,
                  pathNameArr(3).toInt,
                  Map.empty[String, String]
                )
              } else {
                (
                  pathName,
                  pathNameArr(0),
                  "",
                  pathNameArr(1).toLong,
                  pathNameArr(2).toLong,
                  pathNameArr(3).toInt,
                  Map.empty[String, String]
                )
              }
            } else {
              (pathName, "", "", 0L, 0L, 0, Map.empty[String, String])
            }
          (
            childFilePath,
            partitionId,
            minBlockNum,
            maxBlockNum,
            level,
            sum.getLength,
            p.getModificationTime,
            bucketNum,
            partitionValues)
        })
      .filter(!_._2.equals(""))

    // generate CommitInfo and AddFile
    val versionFileName = FileNames.deltaFile(clickHouseTableV2.deltaLog.logPath, 1)
    if (fs.exists(versionFileName)) {
      fs.delete(versionFileName, false)
    }
    val finalActions = allDirSummary.map(
      dir => {
        val (filePath, name) =
          (clickHouseTableV2.deltaLog.dataPath.toString + "/" + dir._1, dir._1)
        AddFileTags.partsInfoToAddFile(
          clickHouseTableV2.catalogTable.get.identifier.database.get,
          clickHouseTableV2.catalogTable.get.identifier.table,
          clickHouseTableV2.snapshot.metadata.configuration("engine"),
          filePath,
          "",
          name,
          "",
          0L,
          dir._6,
          dir._6,
          dir._6,
          dir._7,
          dir._2,
          dir._3,
          dir._4,
          dir._5,
          dir._3,
          dir._8,
          dir._1,
          dataChange = true,
          partitionValues = dir._9
        )
      })
    if (finalActions.nonEmpty) {
      // write transaction log
      logInfo(s"starting to generate commit info, finalActions.length=${finalActions.length} .")
      clickHouseTableV2.deltaLog.withNewTransaction {
        txn =>
          val operation =
            DeltaOperations.Write(SaveMode.Append, Option(Seq.empty[String]), None, None)
          txn.commit(finalActions, operation)
      }
    }
    finalActions
  }