override def output: Seq[Attribute] = Seq()

in backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala [56:222]


  override def output: Seq[Attribute] = Seq(
    AttributeReference("result", BooleanType, nullable = false)(),
    AttributeReference("reason", StringType, nullable = false)())

  override def run(sparkSession: SparkSession): Seq[Row] = {
    val pathToCache =
      if (path.nonEmpty) {
        new Path(path.get)
      } else if (table.nonEmpty) {
        DeltaTableIdentifier(sparkSession, table.get) match {
          case Some(id) if id.path.nonEmpty =>
            new Path(id.path.get)
          case _ =>
            new Path(sparkSession.sessionState.catalog.getTableMetadata(table.get).location)
        }
      } else {
        throw DeltaErrors.missingTableIdentifierException("CACHE DATA")
      }

    val baseDeltaPath = DeltaTableUtils.findDeltaTableRoot(sparkSession, pathToCache)
    if (baseDeltaPath.isDefined) {
      if (baseDeltaPath.get != pathToCache) {
        throw DeltaErrors.vacuumBasePathMissingException(baseDeltaPath.get)
      }
    }

    val deltaLog = DeltaLog.forTable(sparkSession, pathToCache)
    if (!deltaLog.tableExists) {
      throw DeltaErrors.notADeltaTableException(
        "CACHE DATA",
        DeltaTableIdentifier(path = Some(pathToCache.toString)))
    }

    val snapshot = deltaLog.update()

    require(
      snapshot.version >= 0,
      "No state defined for this table. Is this really " +
        "a Delta table? Refusing to garbage collect.")

    val allColumns = snapshot.dataSchema.fieldNames.toSeq
    val selectedColumns = if (selectedColumn.nonEmpty) {
      selectedColumn.get
        .filter(allColumns.contains(_))
        .map(ConverterUtils.normalizeColName)
        .toSeq
    } else {
      allColumns.map(ConverterUtils.normalizeColName)
    }

    val selectedAddFiles = if (tsfilter.isDefined) {
      val allParts =
        DeltaAdapter.snapshotFilesForScan(snapshot, Seq.empty, Seq.empty, keepNumRecords = false)
      allParts.files.filter(_.modificationTime >= tsfilter.get.toLong).toSeq
    } else if (partitionColumn.isDefined && partitionValue.isDefined) {
      val partitionColumns = snapshot.metadata.partitionSchema.fieldNames
      require(
        partitionColumns.contains(partitionColumn.get),
        s"the partition column ${partitionColumn.get} is invalid.")
      val partitionColumnField = snapshot.metadata.partitionSchema(partitionColumn.get)

      val partitionColumnAttr = AttributeReference(
        ConverterUtils.normalizeColName(partitionColumn.get),
        partitionColumnField.dataType,
        partitionColumnField.nullable)()
      val isNotNullExpr = IsNotNull(partitionColumnAttr)
      val greaterThanOrEqual = GreaterThanOrEqual(partitionColumnAttr, Literal(partitionValue.get))
      DeltaAdapter
        .snapshotFilesForScan(
          snapshot,
          Seq(partitionColumnAttr),
          Seq(isNotNullExpr, greaterThanOrEqual),
          keepNumRecords = false)
        .files
    } else {
      DeltaAdapter
        .snapshotFilesForScan(snapshot, Seq.empty, Seq.empty, keepNumRecords = false)
        .files
    }

    val executorIdsToAddFiles =
      scala.collection.mutable.Map[String, ArrayBuffer[AddMergeTreeParts]]()
    val executorIdsToParts = scala.collection.mutable.Map[String, String]()
    executorIdsToAddFiles.put(ALL_EXECUTORS, new ArrayBuffer[AddMergeTreeParts]())
    selectedAddFiles.foreach(
      addFile => {
        val mergeTreePart = addFile.asInstanceOf[AddMergeTreeParts]
        val partName = mergeTreePart.name
        val tableUri = URI.create(mergeTreePart.tablePath)
        val relativeTablePath = if (tableUri.getPath.startsWith("/")) {
          tableUri.getPath.substring(1)
        } else tableUri.getPath

        val locations = CHAffinity.getNativeMergeTreePartLocations(partName, relativeTablePath)

        if (locations.isEmpty) {
          // non soft affinity
          executorIdsToAddFiles(ALL_EXECUTORS)
            .append(mergeTreePart)
        } else {
          locations.foreach(
            executor => {
              if (!executorIdsToAddFiles.contains(executor)) {
                executorIdsToAddFiles.put(executor, new ArrayBuffer[AddMergeTreeParts]())
              }
              executorIdsToAddFiles(executor).append(mergeTreePart)
            })
        }
      })

    executorIdsToAddFiles.foreach(
      value => {
        val parts = value._2
        val executorId = value._1
        if (parts.nonEmpty) {
          val onePart = parts(0)
          val extensionTableNode = ExtensionTableBuilder.makeExtensionTable(
            onePart.database,
            onePart.table,
            ClickhouseSnapshot.genSnapshotId(snapshot),
            onePart.tablePath,
            pathToCache.toString,
            snapshot.metadata.configuration
              .getOrElse("orderByKey", StorageMeta.DEFAULT_ORDER_BY_KEY),
            snapshot.metadata.configuration.getOrElse("lowCardKey", ""),
            snapshot.metadata.configuration.getOrElse("minmaxIndexKey", ""),
            snapshot.metadata.configuration.getOrElse("bloomfilterIndexKey", ""),
            snapshot.metadata.configuration.getOrElse("setIndexKey", ""),
            snapshot.metadata.configuration.getOrElse("primaryKey", ""),
            PartSerializer.fromPartNames(parts.map(_.name).toSeq),
            snapshot.metadata.schema,
            snapshot.metadata.configuration.asJava,
            new JList[String]()
          )

          executorIdsToParts.put(executorId, extensionTableNode.getExtensionTableStr)
        }
      })
    val futureList = ArrayBuffer[(String, Future[CacheJobInfo])]()
    if (executorIdsToParts.contains(ALL_EXECUTORS)) {
      // send all parts to all executors
      val tableMessage = executorIdsToParts(ALL_EXECUTORS)
      GlutenDriverEndpoint.executorDataMap.forEach(
        (executorId, executor) => {
          futureList.append(
            (
              executorId,
              executor.executorEndpointRef.ask[CacheJobInfo](
                GlutenMergeTreeCacheLoad(tableMessage, selectedColumns.toSet.asJava, onlyMetaCache)
              )))
        })
    } else {
      executorIdsToParts.foreach(
        value => {
          checkExecutorId(value._1)
          val executorData = GlutenDriverEndpoint.executorDataMap.get(toExecutorId(value._1))
          futureList.append(
            (
              value._1,
              executorData.executorEndpointRef.ask[CacheJobInfo](
                GlutenMergeTreeCacheLoad(value._2, selectedColumns.toSet.asJava, onlyMetaCache)
              )))
        })
    }

    getResult(futureList, asynExecute)
  }