in backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/delta/commands/VacuumCommand.scala [210:473]
def gc(
spark: SparkSession,
deltaLog: DeltaLog,
dryRun: Boolean = true,
retentionHours: Option[Double] = None,
inventory: Option[DataFrame] = None,
clock: Clock = new SystemClock): DataFrame = {
recordDeltaOperation(deltaLog, "delta.gc") {
val vacuumStartTime = System.currentTimeMillis()
val path = deltaLog.dataPath
val deltaHadoopConf = deltaLog.newDeltaHadoopConf()
val fs = path.getFileSystem(deltaHadoopConf)
import org.apache.spark.sql.delta.implicits._
val snapshot = deltaLog.update()
deltaLog.protocolWrite(snapshot.protocol)
// --- modified start
val isMergeTreeFormat = ClickHouseConfig
.isMergeTreeFormatEngine(deltaLog.unsafeVolatileMetadata.configuration)
// --- modified end
val snapshotTombstoneRetentionMillis = DeltaLog.tombstoneRetentionMillis(snapshot.metadata)
val retentionMillis = retentionHours.map(h => TimeUnit.HOURS.toMillis(math.round(h)))
val deleteBeforeTimestamp = retentionMillis match {
case Some(millis) => clock.getTimeMillis() - millis
case _ => snapshot.minFileRetentionTimestamp
}
// --- modified start: toGMTString is a deprecated function
logInfo(s"Starting garbage collection (dryRun = $dryRun) of untracked files older than " +
s"${new Date(deleteBeforeTimestamp).toString} in $path")
// --- modified end
val hadoopConf = spark.sparkContext.broadcast(
new SerializableConfiguration(deltaHadoopConf))
val basePath = fs.makeQualified(path).toString
val parallelDeleteEnabled =
spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_VACUUM_PARALLEL_DELETE_ENABLED)
val parallelDeletePartitions =
spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_VACUUM_PARALLEL_DELETE_PARALLELISM)
.getOrElse(spark.sessionState.conf.numShufflePartitions)
val startTimeToIdentifyEligibleFiles = System.currentTimeMillis()
// --- modified start
val originalEnabledGluten =
spark.sparkContext.getLocalProperty(GlutenSessionExtensions.GLUTEN_ENABLE_FOR_THREAD_KEY)
// gluten can not support vacuum command
spark.sparkContext
.setLocalProperty(GlutenSessionExtensions.GLUTEN_ENABLE_FOR_THREAD_KEY, "false")
// --- modified end
val validFiles =
getValidFilesFromSnapshot(
spark,
basePath,
snapshot,
retentionMillis,
hadoopConf,
clock,
checkAbsolutePathOnly = false)
val partitionColumns = snapshot.metadata.partitionSchema.fieldNames
val parallelism = spark.sessionState.conf.parallelPartitionDiscoveryParallelism
val allFilesAndDirsWithDuplicates = inventory match {
case Some(inventoryDF) => getFilesFromInventory(basePath, partitionColumns, inventoryDF)
case None => DeltaFileOperations.recursiveListDirs(
spark,
Seq(basePath),
hadoopConf,
hiddenDirNameFilter = DeltaTableUtils.isHiddenDirectory(partitionColumns, _),
hiddenFileNameFilter = DeltaTableUtils.isHiddenDirectory(partitionColumns, _),
fileListingParallelism = Option(parallelism)
)
}
val allFilesAndDirs = allFilesAndDirsWithDuplicates.groupByKey(_.path)
.mapGroups { (k, v) =>
val duplicates = v.toSeq
// of all the duplicates we can return the newest file.
duplicates.maxBy(_.modificationTime)
}
recordFrameProfile("Delta", "VacuumCommand.gc") {
try {
allFilesAndDirs.cache()
implicit val fileNameAndSizeEncoder =
org.apache.spark.sql.Encoders.product[FileNameAndSize]
val dirCounts = allFilesAndDirs.where(col("isDir")).count() + 1 // +1 for the base path
val filesAndDirsPresentBeforeDelete = allFilesAndDirs.count()
// The logic below is as follows:
// 1. We take all the files and directories listed in our reservoir
// 2. We filter all files older than our tombstone retention period and directories
// 3. We get the subdirectories of all files so that we can find non-empty directories
// 4. We groupBy each path, and count to get how many files are in each sub-directory
// 5. We subtract all the valid files and tombstones in our state
// 6. We filter all paths with a count of 1, which will correspond to files not in the
// state, and empty directories. We can safely delete all of these
// --- modified start
val diff = if (isMergeTreeFormat) {
val diff_tmp = allFilesAndDirs
.where(col("modificationTime") < deleteBeforeTimestamp || col("isDir"))
.mapPartitions { fileStatusIterator =>
val reservoirBase = new Path(basePath)
val fs = reservoirBase.getFileSystem(hadoopConf.value.value)
fileStatusIterator.flatMap { fileStatus =>
if (fileStatus.isDir) {
Iterator.single(FileNameAndSize(
relativize(fileStatus.getHadoopPath, fs, reservoirBase, isDir = true),
0L,
true))
} else {
val dirs = getAllSubdirs(basePath, fileStatus.path, fs)
val dirsWithSlash = dirs.map { p =>
val relativizedPath = relativize(new Path(p), fs, reservoirBase, isDir = true)
FileNameAndSize(relativizedPath, 0L, true)
}
dirsWithSlash ++ Iterator(
FileNameAndSize(relativize(
fileStatus.getHadoopPath, fs, reservoirBase, isDir = false),
fileStatus.length))
}
}
}
.withColumn(
"dir",
when(col("isDir"), col("path"))
.otherwise(expr("substring_index(path, '/',size(split(path, '/')) -1)")))
.groupBy(col("path"), col("dir"))
.agg(count(new Column("*")).as("count"), sum("length").as("length"))
diff_tmp
.join(validFiles, diff_tmp("dir") === validFiles("path"), "leftanti")
.where(col("count") === 1)
} else {
allFilesAndDirs
.where(col("modificationTime") < deleteBeforeTimestamp || col("isDir"))
.mapPartitions { fileStatusIterator =>
val reservoirBase = new Path(basePath)
val fs = reservoirBase.getFileSystem(hadoopConf.value.value)
fileStatusIterator.flatMap { fileStatus =>
if (fileStatus.isDir) {
Iterator.single(FileNameAndSize(
relativize(fileStatus.getHadoopPath, fs, reservoirBase, isDir = true), 0L))
} else {
val dirs = getAllSubdirs(basePath, fileStatus.path, fs)
val dirsWithSlash = dirs.map { p =>
val relativizedPath = relativize(new Path(p), fs, reservoirBase, isDir = true)
FileNameAndSize(relativizedPath, 0L)
}
dirsWithSlash ++ Iterator(
FileNameAndSize(relativize(
fileStatus.getHadoopPath, fs, reservoirBase, isDir = false),
fileStatus.length))
}
}
}.groupBy(col("path")).agg(count(new Column("*")).as("count"),
sum("length").as("length"))
.join(validFiles, Seq("path"), "leftanti")
.where(col("count") === 1)
}
// --- modified end
val sizeOfDataToDeleteRow = diff.agg(sum("length").cast("long")).first()
val sizeOfDataToDelete = if (sizeOfDataToDeleteRow.isNullAt(0)) {
0L
} else {
sizeOfDataToDeleteRow.getLong(0)
}
val diffFiles = diff
.select(col("path"))
.as[String]
.map { relativePath =>
assert(!stringToPath(relativePath).isAbsolute,
"Shouldn't have any absolute paths for deletion here.")
pathToString(DeltaFileOperations.absolutePath(basePath, relativePath))
}
val timeTakenToIdentifyEligibleFiles =
System.currentTimeMillis() - startTimeToIdentifyEligibleFiles
val numFiles = diffFiles.count()
if (dryRun) {
val stats = DeltaVacuumStats(
isDryRun = true,
specifiedRetentionMillis = retentionMillis,
defaultRetentionMillis = snapshotTombstoneRetentionMillis,
minRetainedTimestamp = deleteBeforeTimestamp,
dirsPresentBeforeDelete = dirCounts,
filesAndDirsPresentBeforeDelete = filesAndDirsPresentBeforeDelete,
objectsDeleted = numFiles,
sizeOfDataToDelete = sizeOfDataToDelete,
timeTakenToIdentifyEligibleFiles = timeTakenToIdentifyEligibleFiles,
timeTakenForDelete = 0L,
vacuumStartTime = vacuumStartTime,
vacuumEndTime = System.currentTimeMillis,
numPartitionColumns = partitionColumns.size
)
recordDeltaEvent(deltaLog, "delta.gc.stats", data = stats)
logInfo(s"Found $numFiles files ($sizeOfDataToDelete bytes) and directories in " +
s"a total of $dirCounts directories that are safe to delete. Vacuum stats: $stats")
return diffFiles.map(f => stringToPath(f).toString).toDF("path")
}
logVacuumStart(
spark,
deltaLog,
path,
diffFiles,
sizeOfDataToDelete,
retentionMillis,
snapshotTombstoneRetentionMillis)
val deleteStartTime = System.currentTimeMillis()
val filesDeleted = try {
delete(diffFiles, spark, basePath,
hadoopConf, parallelDeleteEnabled, parallelDeletePartitions)
} catch {
case t: Throwable =>
logVacuumEnd(deltaLog, spark, path)
throw t
}
val timeTakenForDelete = System.currentTimeMillis() - deleteStartTime
val stats = DeltaVacuumStats(
isDryRun = false,
specifiedRetentionMillis = retentionMillis,
defaultRetentionMillis = snapshotTombstoneRetentionMillis,
minRetainedTimestamp = deleteBeforeTimestamp,
dirsPresentBeforeDelete = dirCounts,
filesAndDirsPresentBeforeDelete = filesAndDirsPresentBeforeDelete,
objectsDeleted = filesDeleted,
sizeOfDataToDelete = sizeOfDataToDelete,
timeTakenToIdentifyEligibleFiles = timeTakenToIdentifyEligibleFiles,
timeTakenForDelete = timeTakenForDelete,
vacuumStartTime = vacuumStartTime,
vacuumEndTime = System.currentTimeMillis,
numPartitionColumns = partitionColumns.size)
recordDeltaEvent(deltaLog, "delta.gc.stats", data = stats)
logVacuumEnd(deltaLog, spark, path, Some(filesDeleted), Some(dirCounts))
logInfo(s"Deleted $filesDeleted files ($sizeOfDataToDelete bytes) and directories in " +
s"a total of $dirCounts directories. Vacuum stats: $stats")
spark.createDataset(Seq(basePath)).toDF("path")
} finally {
allFilesAndDirs.unpersist()
// --- modified start
if (originalEnabledGluten != null) {
spark.sparkContext.setLocalProperty(
GlutenSessionExtensions.GLUTEN_ENABLE_FOR_THREAD_KEY, originalEnabledGluten)
} else {
spark.sparkContext.setLocalProperty(
GlutenSessionExtensions.GLUTEN_ENABLE_FOR_THREAD_KEY, "true")
}
// --- modified end
}
}
}
}