in paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkOrphanFilesClean.scala [54:181]
def doOrphanClean(): (Dataset[(Long, Long)], Dataset[BranchAndManifestFile]) = {
import spark.implicits._
val branches = validBranches()
val deletedFilesCountInLocal = new AtomicLong(0)
val deletedFilesLenInBytesInLocal = new AtomicLong(0)
// snapshot and changelog files are the root of everything, so they are handled specially
// here, and subsequently, we will not count their orphan files.
cleanSnapshotDir(
branches,
(_: Path) => deletedFilesCountInLocal.incrementAndGet,
size => deletedFilesLenInBytesInLocal.addAndGet(size))
val maxBranchParallelism = Math.min(branches.size(), parallelism)
// find snapshots using branch and find manifests(manifest, index, statistics) using snapshot
val usedManifestFiles = spark.sparkContext
.parallelize(branches.asScala.toSeq, maxBranchParallelism)
.mapPartitions(_.flatMap {
branch => safelyGetAllSnapshots(branch).asScala.map(snapshot => (branch, snapshot.toJson))
})
.repartition(parallelism)
.flatMap {
case (branch, snapshotJson) =>
val usedFileBuffer = new ArrayBuffer[BranchAndManifestFile]()
val usedFileConsumer =
new Consumer[org.apache.paimon.utils.Pair[String, java.lang.Boolean]] {
override def accept(pair: utils.Pair[String, java.lang.Boolean]): Unit = {
usedFileBuffer.append(BranchAndManifestFile(branch, pair.getLeft, pair.getRight))
}
}
val snapshot = Snapshot.fromJson(snapshotJson)
collectWithoutDataFileWithManifestFlag(branch, snapshot, usedFileConsumer)
usedFileBuffer
}
.toDS()
.cache()
// find all data files
val dataFiles = usedManifestFiles
.filter(_.isManifestFile)
.distinct()
.mapPartitions {
it =>
val branchManifests = new util.HashMap[String, ManifestFile]
it.flatMap {
branchAndManifestFile =>
val manifestFile = branchManifests.computeIfAbsent(
branchAndManifestFile.branch,
(key: String) =>
specifiedTable.switchToBranch(key).store.manifestFileFactory.create)
retryReadingFiles(
() => manifestFile.readWithIOException(branchAndManifestFile.manifestName),
Collections.emptyList[ManifestEntry]
).asScala.flatMap {
manifestEntry =>
manifestEntry.fileName() +: manifestEntry.file().extraFiles().asScala
}
}
}
// union manifest and data files
val usedFiles = usedManifestFiles
.map(_.manifestName)
.union(dataFiles)
.toDF("used_name")
// find candidate files which can be removed
val fileDirs = listPaimonFileDirs.asScala.map(_.toUri.toString).toSeq
val maxFileDirsParallelism = Math.min(fileDirs.size, parallelism)
val candidates = spark.sparkContext
.parallelize(fileDirs, maxFileDirsParallelism)
.flatMap {
dir =>
tryBestListingDirs(new Path(dir)).asScala.filter(oldEnough).map {
file =>
val path = file.getPath
(path.getName, path.toUri.toString, file.getLen, path.getParent.toUri.toString)
}
}
.toDF("name", "path", "len", "dataDir")
.repartition(parallelism)
// use left anti to filter files which is not used
val deleted = candidates
.join(usedFiles, $"name" === $"used_name", "left_anti")
.repartition($"dataDir")
.mapPartitions {
it =>
var deletedFilesCount = 0L
var deletedFilesLenInBytes = 0L
val dataDirs = new mutable.HashSet[String]()
while (it.hasNext) {
val fileInfo = it.next();
val pathToClean = fileInfo.getString(1)
val deletedPath = new Path(pathToClean)
deletedFilesLenInBytes += fileInfo.getLong(2)
cleanFile(deletedPath)
logInfo(s"Cleaned file: $pathToClean")
dataDirs.add(fileInfo.getString(3))
deletedFilesCount += 1
}
// clean empty directory
if (!dryRun) {
val bucketDirs = dataDirs
.filter(_.contains(BUCKET_PATH_PREFIX))
.map(new Path(_))
tryCleanDataDirectory(bucketDirs.asJava, partitionKeysNum + 1)
}
logInfo(
s"Total cleaned files: $deletedFilesCount, Total cleaned files len : $deletedFilesLenInBytes")
Iterator.single((deletedFilesCount, deletedFilesLenInBytes))
}
val finalDeletedDataset =
if (deletedFilesCountInLocal.get() != 0 || deletedFilesLenInBytesInLocal.get() != 0) {
deleted.union(
spark.createDataset(
Seq((deletedFilesCountInLocal.get(), deletedFilesLenInBytesInLocal.get()))))
} else {
deleted
}
(finalDeletedDataset, usedManifestFiles)
}