def gc()

in backends-clickhouse/src-delta-23/main/scala/org/apache/spark/sql/delta/commands/VacuumCommand.scala [108:370]


  def gc(
      spark: SparkSession,
      deltaLog: DeltaLog,
      dryRun: Boolean = true,
      retentionHours: Option[Double] = None,
      clock: Clock = new SystemClock): DataFrame = {
    recordDeltaOperation(deltaLog, "delta.gc") {

      val path = deltaLog.dataPath
      val deltaHadoopConf = deltaLog.newDeltaHadoopConf()
      val fs = path.getFileSystem(deltaHadoopConf)

      import org.apache.spark.sql.delta.implicits._

      val snapshot = deltaLog.update()

      require(snapshot.version >= 0, "No state defined for this table. Is this really " +
        "a Delta table? Refusing to garbage collect.")

      // --- modified start
      val isMergeTreeFormat = ClickHouseConfig
        .isMergeTreeFormatEngine(deltaLog.unsafeVolatileMetadata.configuration)
      // --- modified end

      DeletionVectorUtils.assertDeletionVectorsNotReadable(
        spark, snapshot.metadata, snapshot.protocol)

      val snapshotTombstoneRetentionMillis = DeltaLog.tombstoneRetentionMillis(snapshot.metadata)
      val retentionMillis = retentionHours.map(h => TimeUnit.HOURS.toMillis(math.round(h)))
      checkRetentionPeriodSafety(spark, retentionMillis, snapshotTombstoneRetentionMillis)

      val deleteBeforeTimestamp = retentionMillis.map { millis =>
        clock.getTimeMillis() - millis
      }.getOrElse(snapshot.minFileRetentionTimestamp)
      // --- modified start: toGMTString is a deprecated function
      logInfo(s"Starting garbage collection (dryRun = $dryRun) of untracked files older than " +
        s"${new Date(deleteBeforeTimestamp).toString} in $path")
      // --- modified end
      val hadoopConf = spark.sparkContext.broadcast(
        new SerializableConfiguration(deltaHadoopConf))
      val basePath = fs.makeQualified(path).toString
      var isBloomFiltered = false
      val parallelDeleteEnabled =
        spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_VACUUM_PARALLEL_DELETE_ENABLED)
      val parallelDeletePartitions =
        spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_VACUUM_PARALLEL_DELETE_PARALLELISM)
        .getOrElse(spark.sessionState.conf.numShufflePartitions)
      val relativizeIgnoreError =
        spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_VACUUM_RELATIVIZE_IGNORE_ERROR)
      val startTimeToIdentifyEligibleFiles = System.currentTimeMillis()

      // --- modified start
      val originalEnabledGluten =
        spark.sparkContext.getLocalProperty(GlutenSessionExtensions.GLUTEN_ENABLE_FOR_THREAD_KEY)
      // gluten can not support vacuum command
      spark.sparkContext.setLocalProperty(GlutenSessionExtensions.GLUTEN_ENABLE_FOR_THREAD_KEY, "false")
      // --- modified end

      val validFiles = snapshot.stateDS
        .mapPartitions { actions =>
          val reservoirBase = new Path(basePath)
          val fs = reservoirBase.getFileSystem(hadoopConf.value.value)
          actions.flatMap {
            _.unwrap match {
              case tombstone: RemoveFile if tombstone.delTimestamp < deleteBeforeTimestamp =>
                Nil
              case fa: FileAction =>
                getValidRelativePathsAndSubdirs(
                  fa,
                  fs,
                  reservoirBase,
                  relativizeIgnoreError,
                  isBloomFiltered)
              case _ => Nil
            }
          }
        }.toDF("path")

      val partitionColumns = snapshot.metadata.partitionSchema.fieldNames
      val parallelism = spark.sessionState.conf.parallelPartitionDiscoveryParallelism

      val allFilesAndDirs = DeltaFileOperations.recursiveListDirs(
          spark,
          Seq(basePath),
          hadoopConf,
          hiddenDirNameFilter = DeltaTableUtils.isHiddenDirectory(partitionColumns, _),
          hiddenFileNameFilter = DeltaTableUtils.isHiddenDirectory(partitionColumns, _),
          fileListingParallelism = Option(parallelism)
        )
        .groupByKey(_.path)
        .mapGroups { (k, v) =>
          val duplicates = v.toSeq
          // of all the duplicates we can return the newest file.
          duplicates.maxBy(_.modificationTime)
        }

      try {
        allFilesAndDirs.cache()

        implicit val fileNameAndSizeEncoder = org.apache.spark.sql.Encoders.product[FileNameAndSize]

        val dirCounts = allFilesAndDirs.where(col("isDir")).count() + 1 // +1 for the base path

        // The logic below is as follows:
        //   1. We take all the files and directories listed in our reservoir
        //   2. We filter all files older than our tombstone retention period and directories
        //   3. We get the subdirectories of all files so that we can find non-empty directories
        //   4. We groupBy each path, and count to get how many files are in each sub-directory
        //   5. We subtract all the valid files and tombstones in our state
        //   6. We filter all paths with a count of 1, which will correspond to files not in the
        //      state, and empty directories. We can safely delete all of these
        // --- modified start
        val diff = if (isMergeTreeFormat) {
          val diff_tmp = allFilesAndDirs
            .where(col("modificationTime") < deleteBeforeTimestamp || col("isDir"))
            .mapPartitions { fileStatusIterator =>
              val reservoirBase = new Path(basePath)
              val fs = reservoirBase.getFileSystem(hadoopConf.value.value)
              fileStatusIterator.flatMap { fileStatus =>
                if (fileStatus.isDir) {
                  Iterator.single(FileNameAndSize(
                    relativize(fileStatus.getHadoopPath, fs, reservoirBase, isDir = true),
                    0L,
                    true))
                } else {
                  val dirs = getAllSubdirs(basePath, fileStatus.path, fs)
                  val dirsWithSlash = dirs.map { p =>
                    val relativizedPath = relativize(new Path(p), fs, reservoirBase, isDir = true)
                    FileNameAndSize(relativizedPath, 0L, true)
                  }
                  dirsWithSlash ++ Iterator(
                    FileNameAndSize(relativize(
                      fileStatus.getHadoopPath, fs, reservoirBase, isDir = false),
                      fileStatus.length))
                }
              }
            }
            .withColumn(
              "dir",
              when(col("isDir"), col("path"))
                .otherwise(expr("substring_index(path, '/',size(split(path, '/')) -1)")))
            .groupBy(col("path"), col("dir"))
            .agg(count(new Column("*")).as("count"), sum("length").as("length"))

          diff_tmp
            .join(validFiles, diff_tmp("dir") === validFiles("path"), "leftanti")
            .where(col("count") === 1)
        } else {
          allFilesAndDirs
            .where(col("modificationTime") < deleteBeforeTimestamp || col("isDir"))
            .mapPartitions { fileStatusIterator =>
              val reservoirBase = new Path(basePath)
              val fs = reservoirBase.getFileSystem(hadoopConf.value.value)
              fileStatusIterator.flatMap { fileStatus =>
                if (fileStatus.isDir) {
                  Iterator.single(FileNameAndSize(
                    relativize(fileStatus.getHadoopPath, fs, reservoirBase, isDir = true), 0L))
                } else {
                  val dirs = getAllSubdirs(basePath, fileStatus.path, fs)
                  val dirsWithSlash = dirs.map { p =>
                    val relativizedPath = relativize(new Path(p), fs, reservoirBase, isDir = true)
                    FileNameAndSize(relativizedPath, 0L)
                  }
                  dirsWithSlash ++ Iterator(
                    FileNameAndSize(relativize(
                      fileStatus.getHadoopPath, fs, reservoirBase, isDir = false),
                      fileStatus.length))
                }
              }
            }
            .groupBy(col("path"))
            .agg(count(new Column("*")).as("count"), sum("length").as("length"))
            .join(validFiles, Seq("path"), "leftanti")
            .where(col("count") === 1)
        }
        // --- modified end

        val sizeOfDataToDeleteRow = diff.agg(sum("length").cast("long")).first
        val sizeOfDataToDelete = if (sizeOfDataToDeleteRow.isNullAt(0)) {
          0L
        } else {
          sizeOfDataToDeleteRow.getLong(0)
        }

        val diffFiles = diff
          .select(col("path"))
          .as[String]
          .map { relativePath =>
            assert(!stringToPath(relativePath).isAbsolute,
              "Shouldn't have any absolute paths for deletion here.")
            pathToString(DeltaFileOperations.absolutePath(basePath, relativePath))
          }
        val timeTakenToIdentifyEligibleFiles =
          System.currentTimeMillis() - startTimeToIdentifyEligibleFiles

        val numFiles = diffFiles.count()
        if (dryRun) {
          val stats = DeltaVacuumStats(
            isDryRun = true,
            specifiedRetentionMillis = retentionMillis,
            defaultRetentionMillis = snapshotTombstoneRetentionMillis,
            minRetainedTimestamp = deleteBeforeTimestamp,
            dirsPresentBeforeDelete = dirCounts,
            objectsDeleted = numFiles,
            sizeOfDataToDelete = sizeOfDataToDelete,
            timeTakenToIdentifyEligibleFiles = timeTakenToIdentifyEligibleFiles,
            timeTakenForDelete = 0L)

          recordDeltaEvent(deltaLog, "delta.gc.stats", data = stats)
          logConsole(s"Found $numFiles files ($sizeOfDataToDelete bytes) and directories in " +
            s"a total of $dirCounts directories that are safe to delete.")

          return diffFiles.map(f => stringToPath(f).toString).toDF("path")
        }
        logVacuumStart(
          spark,
          deltaLog,
          path,
          diffFiles,
          sizeOfDataToDelete,
          retentionMillis,
          snapshotTombstoneRetentionMillis)

        val deleteStartTime = System.currentTimeMillis()
        val filesDeleted = try {
          delete(diffFiles, spark, basePath,
            hadoopConf, parallelDeleteEnabled, parallelDeletePartitions)
        } catch {
          case t: Throwable =>
            logVacuumEnd(deltaLog, spark, path)
            throw t
        }
        val timeTakenForDelete = System.currentTimeMillis() - deleteStartTime
        val stats = DeltaVacuumStats(
          isDryRun = false,
          specifiedRetentionMillis = retentionMillis,
          defaultRetentionMillis = snapshotTombstoneRetentionMillis,
          minRetainedTimestamp = deleteBeforeTimestamp,
          dirsPresentBeforeDelete = dirCounts,
          objectsDeleted = filesDeleted,
          sizeOfDataToDelete = sizeOfDataToDelete,
          timeTakenToIdentifyEligibleFiles = timeTakenToIdentifyEligibleFiles,
          timeTakenForDelete = timeTakenForDelete)
        recordDeltaEvent(deltaLog, "delta.gc.stats", data = stats)
        logVacuumEnd(deltaLog, spark, path, Some(filesDeleted), Some(dirCounts))


        spark.createDataset(Seq(basePath)).toDF("path")
      } finally {
        allFilesAndDirs.unpersist()

        // --- modified start
        if (originalEnabledGluten != null) {
          spark.sparkContext.setLocalProperty(
            GlutenSessionExtensions.GLUTEN_ENABLE_FOR_THREAD_KEY, originalEnabledGluten)
        } else {
          spark.sparkContext.setLocalProperty(
            GlutenSessionExtensions.GLUTEN_ENABLE_FOR_THREAD_KEY, "true")
        }
        // --- modified end
      }
    }
  }