def gc()

in backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/delta/commands/VacuumCommand.scala [210:473]


  def gc(
      spark: SparkSession,
      deltaLog: DeltaLog,
      dryRun: Boolean = true,
      retentionHours: Option[Double] = None,
      inventory: Option[DataFrame] = None,
      clock: Clock = new SystemClock): DataFrame = {
    recordDeltaOperation(deltaLog, "delta.gc") {

      val vacuumStartTime = System.currentTimeMillis()
      val path = deltaLog.dataPath
      val deltaHadoopConf = deltaLog.newDeltaHadoopConf()
      val fs = path.getFileSystem(deltaHadoopConf)

      import org.apache.spark.sql.delta.implicits._

      val snapshot = deltaLog.update()
      deltaLog.protocolWrite(snapshot.protocol)

      // --- modified start
      val isMergeTreeFormat = ClickHouseConfig
        .isMergeTreeFormatEngine(deltaLog.unsafeVolatileMetadata.configuration)
      // --- modified end

      val snapshotTombstoneRetentionMillis = DeltaLog.tombstoneRetentionMillis(snapshot.metadata)
      val retentionMillis = retentionHours.map(h => TimeUnit.HOURS.toMillis(math.round(h)))
      val deleteBeforeTimestamp = retentionMillis match {
        case Some(millis) => clock.getTimeMillis() - millis
        case _ => snapshot.minFileRetentionTimestamp
      }
      // --- modified start: toGMTString is a deprecated function
      logInfo(s"Starting garbage collection (dryRun = $dryRun) of untracked files older than " +
        s"${new Date(deleteBeforeTimestamp).toString} in $path")
      // --- modified end
      val hadoopConf = spark.sparkContext.broadcast(
        new SerializableConfiguration(deltaHadoopConf))
      val basePath = fs.makeQualified(path).toString
      val parallelDeleteEnabled =
        spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_VACUUM_PARALLEL_DELETE_ENABLED)
      val parallelDeletePartitions =
        spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_VACUUM_PARALLEL_DELETE_PARALLELISM)
        .getOrElse(spark.sessionState.conf.numShufflePartitions)
      val startTimeToIdentifyEligibleFiles = System.currentTimeMillis()

      // --- modified start
      val originalEnabledGluten =
        spark.sparkContext.getLocalProperty(GlutenSessionExtensions.GLUTEN_ENABLE_FOR_THREAD_KEY)
      // gluten can not support vacuum command
      spark.sparkContext
        .setLocalProperty(GlutenSessionExtensions.GLUTEN_ENABLE_FOR_THREAD_KEY, "false")
      // --- modified end

      val validFiles =
        getValidFilesFromSnapshot(
          spark,
          basePath,
          snapshot,
          retentionMillis,
          hadoopConf,
          clock,
          checkAbsolutePathOnly = false)

      val partitionColumns = snapshot.metadata.partitionSchema.fieldNames
      val parallelism = spark.sessionState.conf.parallelPartitionDiscoveryParallelism
      val allFilesAndDirsWithDuplicates = inventory match {
        case Some(inventoryDF) => getFilesFromInventory(basePath, partitionColumns, inventoryDF)
        case None => DeltaFileOperations.recursiveListDirs(
          spark,
          Seq(basePath),
          hadoopConf,
          hiddenDirNameFilter = DeltaTableUtils.isHiddenDirectory(partitionColumns, _),
          hiddenFileNameFilter = DeltaTableUtils.isHiddenDirectory(partitionColumns, _),
          fileListingParallelism = Option(parallelism)
        )
      }
      val allFilesAndDirs = allFilesAndDirsWithDuplicates.groupByKey(_.path)
        .mapGroups { (k, v) =>
          val duplicates = v.toSeq
          // of all the duplicates we can return the newest file.
          duplicates.maxBy(_.modificationTime)
        }

      recordFrameProfile("Delta", "VacuumCommand.gc") {
        try {
          allFilesAndDirs.cache()

          implicit val fileNameAndSizeEncoder =
            org.apache.spark.sql.Encoders.product[FileNameAndSize]

          val dirCounts = allFilesAndDirs.where(col("isDir")).count() + 1 // +1 for the base path
          val filesAndDirsPresentBeforeDelete = allFilesAndDirs.count()

          // The logic below is as follows:
          //   1. We take all the files and directories listed in our reservoir
          //   2. We filter all files older than our tombstone retention period and directories
          //   3. We get the subdirectories of all files so that we can find non-empty directories
          //   4. We groupBy each path, and count to get how many files are in each sub-directory
          //   5. We subtract all the valid files and tombstones in our state
          //   6. We filter all paths with a count of 1, which will correspond to files not in the
          //      state, and empty directories. We can safely delete all of these
          // --- modified start
          val diff = if (isMergeTreeFormat) {
            val diff_tmp = allFilesAndDirs
              .where(col("modificationTime") < deleteBeforeTimestamp || col("isDir"))
              .mapPartitions { fileStatusIterator =>
                val reservoirBase = new Path(basePath)
                val fs = reservoirBase.getFileSystem(hadoopConf.value.value)
                fileStatusIterator.flatMap { fileStatus =>
                  if (fileStatus.isDir) {
                    Iterator.single(FileNameAndSize(
                      relativize(fileStatus.getHadoopPath, fs, reservoirBase, isDir = true),
                      0L,
                      true))
                  } else {
                    val dirs = getAllSubdirs(basePath, fileStatus.path, fs)
                    val dirsWithSlash = dirs.map { p =>
                      val relativizedPath = relativize(new Path(p), fs, reservoirBase, isDir = true)
                      FileNameAndSize(relativizedPath, 0L, true)
                    }
                    dirsWithSlash ++ Iterator(
                      FileNameAndSize(relativize(
                        fileStatus.getHadoopPath, fs, reservoirBase, isDir = false),
                        fileStatus.length))
                  }
                }
              }
              .withColumn(
                "dir",
                when(col("isDir"), col("path"))
                  .otherwise(expr("substring_index(path, '/',size(split(path, '/')) -1)")))
              .groupBy(col("path"), col("dir"))
              .agg(count(new Column("*")).as("count"), sum("length").as("length"))

            diff_tmp
              .join(validFiles, diff_tmp("dir") === validFiles("path"), "leftanti")
              .where(col("count") === 1)
          } else {
            allFilesAndDirs
              .where(col("modificationTime") < deleteBeforeTimestamp || col("isDir"))
              .mapPartitions { fileStatusIterator =>
                val reservoirBase = new Path(basePath)
                val fs = reservoirBase.getFileSystem(hadoopConf.value.value)
                fileStatusIterator.flatMap { fileStatus =>
                  if (fileStatus.isDir) {
                    Iterator.single(FileNameAndSize(
                      relativize(fileStatus.getHadoopPath, fs, reservoirBase, isDir = true), 0L))
                  } else {
                    val dirs = getAllSubdirs(basePath, fileStatus.path, fs)
                    val dirsWithSlash = dirs.map { p =>
                      val relativizedPath = relativize(new Path(p), fs, reservoirBase, isDir = true)
                      FileNameAndSize(relativizedPath, 0L)
                    }
                    dirsWithSlash ++ Iterator(
                      FileNameAndSize(relativize(
                        fileStatus.getHadoopPath, fs, reservoirBase, isDir = false),
                        fileStatus.length))
                  }
                }
              }.groupBy(col("path")).agg(count(new Column("*")).as("count"),
              sum("length").as("length"))
              .join(validFiles, Seq("path"), "leftanti")
              .where(col("count") === 1)
          }
          // --- modified end

          val sizeOfDataToDeleteRow = diff.agg(sum("length").cast("long")).first()
          val sizeOfDataToDelete = if (sizeOfDataToDeleteRow.isNullAt(0)) {
            0L
          } else {
            sizeOfDataToDeleteRow.getLong(0)
          }

          val diffFiles = diff
            .select(col("path"))
            .as[String]
            .map { relativePath =>
              assert(!stringToPath(relativePath).isAbsolute,
                "Shouldn't have any absolute paths for deletion here.")
              pathToString(DeltaFileOperations.absolutePath(basePath, relativePath))
            }
          val timeTakenToIdentifyEligibleFiles =
            System.currentTimeMillis() - startTimeToIdentifyEligibleFiles


          val numFiles = diffFiles.count()
          if (dryRun) {
            val stats = DeltaVacuumStats(
              isDryRun = true,
              specifiedRetentionMillis = retentionMillis,
              defaultRetentionMillis = snapshotTombstoneRetentionMillis,
              minRetainedTimestamp = deleteBeforeTimestamp,
              dirsPresentBeforeDelete = dirCounts,
              filesAndDirsPresentBeforeDelete = filesAndDirsPresentBeforeDelete,
              objectsDeleted = numFiles,
              sizeOfDataToDelete = sizeOfDataToDelete,
              timeTakenToIdentifyEligibleFiles = timeTakenToIdentifyEligibleFiles,
              timeTakenForDelete = 0L,
              vacuumStartTime = vacuumStartTime,
              vacuumEndTime = System.currentTimeMillis,
              numPartitionColumns = partitionColumns.size
            )

            recordDeltaEvent(deltaLog, "delta.gc.stats", data = stats)
            logInfo(s"Found $numFiles files ($sizeOfDataToDelete bytes) and directories in " +
              s"a total of $dirCounts directories that are safe to delete. Vacuum stats: $stats")

            return diffFiles.map(f => stringToPath(f).toString).toDF("path")
          }
          logVacuumStart(
            spark,
            deltaLog,
            path,
            diffFiles,
            sizeOfDataToDelete,
            retentionMillis,
            snapshotTombstoneRetentionMillis)

          val deleteStartTime = System.currentTimeMillis()
          val filesDeleted = try {
            delete(diffFiles, spark, basePath,
              hadoopConf, parallelDeleteEnabled, parallelDeletePartitions)
          } catch {
            case t: Throwable =>
              logVacuumEnd(deltaLog, spark, path)
              throw t
          }
          val timeTakenForDelete = System.currentTimeMillis() - deleteStartTime
          val stats = DeltaVacuumStats(
            isDryRun = false,
            specifiedRetentionMillis = retentionMillis,
            defaultRetentionMillis = snapshotTombstoneRetentionMillis,
            minRetainedTimestamp = deleteBeforeTimestamp,
            dirsPresentBeforeDelete = dirCounts,
            filesAndDirsPresentBeforeDelete = filesAndDirsPresentBeforeDelete,
            objectsDeleted = filesDeleted,
            sizeOfDataToDelete = sizeOfDataToDelete,
            timeTakenToIdentifyEligibleFiles = timeTakenToIdentifyEligibleFiles,
            timeTakenForDelete = timeTakenForDelete,
            vacuumStartTime = vacuumStartTime,
            vacuumEndTime = System.currentTimeMillis,
            numPartitionColumns = partitionColumns.size)
          recordDeltaEvent(deltaLog, "delta.gc.stats", data = stats)
          logVacuumEnd(deltaLog, spark, path, Some(filesDeleted), Some(dirCounts))
          logInfo(s"Deleted $filesDeleted files ($sizeOfDataToDelete bytes) and directories in " +
            s"a total of $dirCounts directories. Vacuum stats: $stats")


          spark.createDataset(Seq(basePath)).toDF("path")
        } finally {
          allFilesAndDirs.unpersist()

          // --- modified start
          if (originalEnabledGluten != null) {
            spark.sparkContext.setLocalProperty(
              GlutenSessionExtensions.GLUTEN_ENABLE_FOR_THREAD_KEY, originalEnabledGluten)
          } else {
            spark.sparkContext.setLocalProperty(
              GlutenSessionExtensions.GLUTEN_ENABLE_FOR_THREAD_KEY, "true")
          }
          // --- modified end
        }
      }
    }
  }