private def checkScan()

in extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala [53:269]


  private def checkScan(
      plan: LogicalPlan,
      maxScanPartitionsOpt: Option[Int],
      maxFileSizeOpt: Option[Long]): Unit = {
    plan match {
      case ScanOperation(_, _, relation: HiveTableRelation) =>
        if (relation.isPartitioned) {
          relation.prunedPartitions match {
            case Some(prunedPartitions) =>
              if (maxScanPartitionsOpt.exists(_ < prunedPartitions.size)) {
                throw new MaxPartitionExceedException(
                  s"""
                     |SQL job scan hive partition: ${prunedPartitions.size}
                     |exceed restrict of hive scan maxPartition ${maxScanPartitionsOpt.get}
                     |You should optimize your SQL logical according partition structure
                     |or shorten query scope such as p_date, detail as below:
                     |Table: ${relation.tableMeta.qualifiedName}
                     |Owner: ${relation.tableMeta.owner}
                     |Partition Structure: ${relation.partitionCols.map(_.name).mkString(", ")}
                     |""".stripMargin)
              }
              lazy val scanFileSize = prunedPartitions.flatMap(_.stats).map(_.sizeInBytes).sum
              if (maxFileSizeOpt.exists(_ < scanFileSize)) {
                throw partTableMaxFileExceedError(
                  scanFileSize,
                  maxFileSizeOpt.get,
                  Some(relation.tableMeta),
                  prunedPartitions.flatMap(_.storage.locationUri).map(_.toString),
                  relation.partitionCols.map(_.name))
              }
            case _ =>
              lazy val scanPartitions: Int = session
                .sessionState.catalog.externalCatalog.listPartitionNames(
                  relation.tableMeta.database,
                  relation.tableMeta.identifier.table).size
              if (maxScanPartitionsOpt.exists(_ < scanPartitions)) {
                throw new MaxPartitionExceedException(
                  s"""
                     |Your SQL job scan a whole huge table without any partition filter,
                     |You should optimize your SQL logical according partition structure
                     |or shorten query scope such as p_date, detail as below:
                     |Table: ${relation.tableMeta.qualifiedName}
                     |Owner: ${relation.tableMeta.owner}
                     |Partition Structure: ${relation.partitionCols.map(_.name).mkString(", ")}
                     |""".stripMargin)
              }

              lazy val scanFileSize: BigInt =
                relation.tableMeta.stats.map(_.sizeInBytes).getOrElse {
                  session
                    .sessionState.catalog.externalCatalog.listPartitions(
                      relation.tableMeta.database,
                      relation.tableMeta.identifier.table).flatMap(_.stats).map(_.sizeInBytes).sum
                }
              if (maxFileSizeOpt.exists(_ < scanFileSize)) {
                throw new MaxFileSizeExceedException(
                  s"""
                     |Your SQL job scan a whole huge table without any partition filter,
                     |You should optimize your SQL logical according partition structure
                     |or shorten query scope such as p_date, detail as below:
                     |Table: ${relation.tableMeta.qualifiedName}
                     |Owner: ${relation.tableMeta.owner}
                     |Partition Structure: ${relation.partitionCols.map(_.name).mkString(", ")}
                     |""".stripMargin)
              }
          }
        } else {
          lazy val scanFileSize = relation.tableMeta.stats.map(_.sizeInBytes).sum
          if (maxFileSizeOpt.exists(_ < scanFileSize)) {
            throw nonPartTableMaxFileExceedError(
              scanFileSize,
              maxFileSizeOpt.get,
              Some(relation.tableMeta))
          }
        }
      case ScanOperation(
            _,
            filters,
            relation @ LogicalRelation(
              fsRelation @ HadoopFsRelation(
                fileIndex: InMemoryFileIndex,
                partitionSchema,
                _,
                _,
                _,
                _),
              _,
              _,
              _)) =>
        if (fsRelation.partitionSchema.nonEmpty) {
          val (partitionKeyFilters, dataFilter) =
            getPartitionKeyFiltersAndDataFilters(
              fsRelation.sparkSession,
              relation,
              partitionSchema,
              filters,
              relation.output)
          val prunedPartitions = fileIndex.listFiles(
            partitionKeyFilters.toSeq,
            dataFilter)
          if (maxScanPartitionsOpt.exists(_ < prunedPartitions.size)) {
            throw maxPartitionExceedError(
              prunedPartitions.size,
              maxScanPartitionsOpt.get,
              relation.catalogTable,
              fileIndex.rootPaths,
              fsRelation.partitionSchema)
          }
          lazy val scanFileSize = prunedPartitions.flatMap(_.files).map(_.getLen).sum
          if (maxFileSizeOpt.exists(_ < scanFileSize)) {
            throw partTableMaxFileExceedError(
              scanFileSize,
              maxFileSizeOpt.get,
              relation.catalogTable,
              fileIndex.rootPaths.map(_.toString),
              fsRelation.partitionSchema.map(_.name))
          }
        } else {
          lazy val scanFileSize = fileIndex.sizeInBytes
          if (maxFileSizeOpt.exists(_ < scanFileSize)) {
            throw nonPartTableMaxFileExceedError(
              scanFileSize,
              maxFileSizeOpt.get,
              relation.catalogTable)
          }
        }
      case ScanOperation(
            _,
            filters,
            logicalRelation @ LogicalRelation(
              fsRelation @ HadoopFsRelation(
                catalogFileIndex: CatalogFileIndex,
                partitionSchema,
                _,
                _,
                _,
                _),
              _,
              _,
              _)) =>
        if (fsRelation.partitionSchema.nonEmpty) {
          val (partitionKeyFilters, _) =
            getPartitionKeyFiltersAndDataFilters(
              fsRelation.sparkSession,
              logicalRelation,
              partitionSchema,
              filters,
              logicalRelation.output)

          val fileIndex = catalogFileIndex.filterPartitions(
            partitionKeyFilters.toSeq)

          lazy val prunedPartitionSize = fileIndex.partitionSpec().partitions.size
          if (maxScanPartitionsOpt.exists(_ < prunedPartitionSize)) {
            throw maxPartitionExceedError(
              prunedPartitionSize,
              maxScanPartitionsOpt.get,
              logicalRelation.catalogTable,
              catalogFileIndex.rootPaths,
              fsRelation.partitionSchema)
          }

          lazy val scanFileSize = fileIndex
            .listFiles(Nil, Nil).flatMap(_.files).map(_.getLen).sum
          if (maxFileSizeOpt.exists(_ < scanFileSize)) {
            throw partTableMaxFileExceedError(
              scanFileSize,
              maxFileSizeOpt.get,
              logicalRelation.catalogTable,
              catalogFileIndex.rootPaths.map(_.toString),
              fsRelation.partitionSchema.map(_.name))
          }
        } else {
          lazy val scanFileSize = catalogFileIndex.sizeInBytes
          if (maxFileSizeOpt.exists(_ < scanFileSize)) {
            throw nonPartTableMaxFileExceedError(
              scanFileSize,
              maxFileSizeOpt.get,
              logicalRelation.catalogTable)
          }
        }
      case ScanOperation(
            _,
            _,
            relation @ DataSourceV2ScanRelation(_, _, _, _)) =>
        val table = relation.relation.table
        if (table.partitioning().nonEmpty) {
          val partitionColumnNames = table.partitioning().map(_.describe())
          val stats = relation.computeStats()
          lazy val scanFileSize = stats.sizeInBytes
          if (maxFileSizeOpt.exists(_ < scanFileSize)) {
            throw new MaxFileSizeExceedException(
              s"""
                 |SQL job scan file size in bytes: $scanFileSize
                 |exceed restrict of table scan maxFileSize ${maxFileSizeOpt.get}
                 |You should optimize your SQL logical according partition structure
                 |or shorten query scope such as p_date, detail as below:
                 |Table: ${table.name()}
                 |Partition Structure: ${partitionColumnNames.mkString(",")}
                 |""".stripMargin)
          }
        } else {
          val stats = relation.computeStats()
          lazy val scanFileSize = stats.sizeInBytes
          if (maxFileSizeOpt.exists(_ < scanFileSize)) {
            throw new MaxFileSizeExceedException(
              s"""
                 |SQL job scan file size in bytes: $scanFileSize
                 |exceed restrict of table scan maxFileSize ${maxFileSizeOpt.get}
                 |detail as below:
                 |Table: ${table.name()}
                 |""".stripMargin)
          }
        }
      case _ =>
    }
  }