def doOrphanClean()

in paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkOrphanFilesClean.scala [54:181]


  def doOrphanClean(): (Dataset[(Long, Long)], Dataset[BranchAndManifestFile]) = {
    import spark.implicits._

    val branches = validBranches()
    val deletedFilesCountInLocal = new AtomicLong(0)
    val deletedFilesLenInBytesInLocal = new AtomicLong(0)
    // snapshot and changelog files are the root of everything, so they are handled specially
    // here, and subsequently, we will not count their orphan files.
    cleanSnapshotDir(
      branches,
      (_: Path) => deletedFilesCountInLocal.incrementAndGet,
      size => deletedFilesLenInBytesInLocal.addAndGet(size))

    val maxBranchParallelism = Math.min(branches.size(), parallelism)
    // find snapshots using branch and find manifests(manifest, index, statistics) using snapshot
    val usedManifestFiles = spark.sparkContext
      .parallelize(branches.asScala.toSeq, maxBranchParallelism)
      .mapPartitions(_.flatMap {
        branch => safelyGetAllSnapshots(branch).asScala.map(snapshot => (branch, snapshot.toJson))
      })
      .repartition(parallelism)
      .flatMap {
        case (branch, snapshotJson) =>
          val usedFileBuffer = new ArrayBuffer[BranchAndManifestFile]()
          val usedFileConsumer =
            new Consumer[org.apache.paimon.utils.Pair[String, java.lang.Boolean]] {
              override def accept(pair: utils.Pair[String, java.lang.Boolean]): Unit = {
                usedFileBuffer.append(BranchAndManifestFile(branch, pair.getLeft, pair.getRight))
              }
            }
          val snapshot = Snapshot.fromJson(snapshotJson)
          collectWithoutDataFileWithManifestFlag(branch, snapshot, usedFileConsumer)
          usedFileBuffer
      }
      .toDS()
      .cache()

    // find all data files
    val dataFiles = usedManifestFiles
      .filter(_.isManifestFile)
      .distinct()
      .mapPartitions {
        it =>
          val branchManifests = new util.HashMap[String, ManifestFile]
          it.flatMap {
            branchAndManifestFile =>
              val manifestFile = branchManifests.computeIfAbsent(
                branchAndManifestFile.branch,
                (key: String) =>
                  specifiedTable.switchToBranch(key).store.manifestFileFactory.create)

              retryReadingFiles(
                () => manifestFile.readWithIOException(branchAndManifestFile.manifestName),
                Collections.emptyList[ManifestEntry]
              ).asScala.flatMap {
                manifestEntry =>
                  manifestEntry.fileName() +: manifestEntry.file().extraFiles().asScala
              }
          }
      }

    // union manifest and data files
    val usedFiles = usedManifestFiles
      .map(_.manifestName)
      .union(dataFiles)
      .toDF("used_name")

    // find candidate files which can be removed
    val fileDirs = listPaimonFileDirs.asScala.map(_.toUri.toString).toSeq
    val maxFileDirsParallelism = Math.min(fileDirs.size, parallelism)
    val candidates = spark.sparkContext
      .parallelize(fileDirs, maxFileDirsParallelism)
      .flatMap {
        dir =>
          tryBestListingDirs(new Path(dir)).asScala.filter(oldEnough).map {
            file =>
              val path = file.getPath
              (path.getName, path.toUri.toString, file.getLen, path.getParent.toUri.toString)
          }
      }
      .toDF("name", "path", "len", "dataDir")
      .repartition(parallelism)

    // use left anti to filter files which is not used
    val deleted = candidates
      .join(usedFiles, $"name" === $"used_name", "left_anti")
      .repartition($"dataDir")
      .mapPartitions {
        it =>
          var deletedFilesCount = 0L
          var deletedFilesLenInBytes = 0L
          val dataDirs = new mutable.HashSet[String]()

          while (it.hasNext) {
            val fileInfo = it.next();
            val pathToClean = fileInfo.getString(1)
            val deletedPath = new Path(pathToClean)
            deletedFilesLenInBytes += fileInfo.getLong(2)
            cleanFile(deletedPath)
            logInfo(s"Cleaned file: $pathToClean")
            dataDirs.add(fileInfo.getString(3))
            deletedFilesCount += 1
          }

          // clean empty directory
          if (!dryRun) {
            val bucketDirs = dataDirs
              .filter(_.contains(BUCKET_PATH_PREFIX))
              .map(new Path(_))
            tryCleanDataDirectory(bucketDirs.asJava, partitionKeysNum + 1)
          }

          logInfo(
            s"Total cleaned files: $deletedFilesCount, Total cleaned files len : $deletedFilesLenInBytes")
          Iterator.single((deletedFilesCount, deletedFilesLenInBytes))
      }

    val finalDeletedDataset =
      if (deletedFilesCountInLocal.get() != 0 || deletedFilesLenInBytesInLocal.get() != 0) {
        deleted.union(
          spark.createDataset(
            Seq((deletedFilesCountInLocal.get(), deletedFilesLenInBytesInLocal.get()))))
      } else {
        deleted
      }

    (finalDeletedDataset, usedManifestFiles)
  }