in extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala [53:269]
private def checkScan(
plan: LogicalPlan,
maxScanPartitionsOpt: Option[Int],
maxFileSizeOpt: Option[Long]): Unit = {
plan match {
case ScanOperation(_, _, relation: HiveTableRelation) =>
if (relation.isPartitioned) {
relation.prunedPartitions match {
case Some(prunedPartitions) =>
if (maxScanPartitionsOpt.exists(_ < prunedPartitions.size)) {
throw new MaxPartitionExceedException(
s"""
|SQL job scan hive partition: ${prunedPartitions.size}
|exceed restrict of hive scan maxPartition ${maxScanPartitionsOpt.get}
|You should optimize your SQL logical according partition structure
|or shorten query scope such as p_date, detail as below:
|Table: ${relation.tableMeta.qualifiedName}
|Owner: ${relation.tableMeta.owner}
|Partition Structure: ${relation.partitionCols.map(_.name).mkString(", ")}
|""".stripMargin)
}
lazy val scanFileSize = prunedPartitions.flatMap(_.stats).map(_.sizeInBytes).sum
if (maxFileSizeOpt.exists(_ < scanFileSize)) {
throw partTableMaxFileExceedError(
scanFileSize,
maxFileSizeOpt.get,
Some(relation.tableMeta),
prunedPartitions.flatMap(_.storage.locationUri).map(_.toString),
relation.partitionCols.map(_.name))
}
case _ =>
lazy val scanPartitions: Int = session
.sessionState.catalog.externalCatalog.listPartitionNames(
relation.tableMeta.database,
relation.tableMeta.identifier.table).size
if (maxScanPartitionsOpt.exists(_ < scanPartitions)) {
throw new MaxPartitionExceedException(
s"""
|Your SQL job scan a whole huge table without any partition filter,
|You should optimize your SQL logical according partition structure
|or shorten query scope such as p_date, detail as below:
|Table: ${relation.tableMeta.qualifiedName}
|Owner: ${relation.tableMeta.owner}
|Partition Structure: ${relation.partitionCols.map(_.name).mkString(", ")}
|""".stripMargin)
}
lazy val scanFileSize: BigInt =
relation.tableMeta.stats.map(_.sizeInBytes).getOrElse {
session
.sessionState.catalog.externalCatalog.listPartitions(
relation.tableMeta.database,
relation.tableMeta.identifier.table).flatMap(_.stats).map(_.sizeInBytes).sum
}
if (maxFileSizeOpt.exists(_ < scanFileSize)) {
throw new MaxFileSizeExceedException(
s"""
|Your SQL job scan a whole huge table without any partition filter,
|You should optimize your SQL logical according partition structure
|or shorten query scope such as p_date, detail as below:
|Table: ${relation.tableMeta.qualifiedName}
|Owner: ${relation.tableMeta.owner}
|Partition Structure: ${relation.partitionCols.map(_.name).mkString(", ")}
|""".stripMargin)
}
}
} else {
lazy val scanFileSize = relation.tableMeta.stats.map(_.sizeInBytes).sum
if (maxFileSizeOpt.exists(_ < scanFileSize)) {
throw nonPartTableMaxFileExceedError(
scanFileSize,
maxFileSizeOpt.get,
Some(relation.tableMeta))
}
}
case ScanOperation(
_,
filters,
relation @ LogicalRelation(
fsRelation @ HadoopFsRelation(
fileIndex: InMemoryFileIndex,
partitionSchema,
_,
_,
_,
_),
_,
_,
_)) =>
if (fsRelation.partitionSchema.nonEmpty) {
val (partitionKeyFilters, dataFilter) =
getPartitionKeyFiltersAndDataFilters(
fsRelation.sparkSession,
relation,
partitionSchema,
filters,
relation.output)
val prunedPartitions = fileIndex.listFiles(
partitionKeyFilters.toSeq,
dataFilter)
if (maxScanPartitionsOpt.exists(_ < prunedPartitions.size)) {
throw maxPartitionExceedError(
prunedPartitions.size,
maxScanPartitionsOpt.get,
relation.catalogTable,
fileIndex.rootPaths,
fsRelation.partitionSchema)
}
lazy val scanFileSize = prunedPartitions.flatMap(_.files).map(_.getLen).sum
if (maxFileSizeOpt.exists(_ < scanFileSize)) {
throw partTableMaxFileExceedError(
scanFileSize,
maxFileSizeOpt.get,
relation.catalogTable,
fileIndex.rootPaths.map(_.toString),
fsRelation.partitionSchema.map(_.name))
}
} else {
lazy val scanFileSize = fileIndex.sizeInBytes
if (maxFileSizeOpt.exists(_ < scanFileSize)) {
throw nonPartTableMaxFileExceedError(
scanFileSize,
maxFileSizeOpt.get,
relation.catalogTable)
}
}
case ScanOperation(
_,
filters,
logicalRelation @ LogicalRelation(
fsRelation @ HadoopFsRelation(
catalogFileIndex: CatalogFileIndex,
partitionSchema,
_,
_,
_,
_),
_,
_,
_)) =>
if (fsRelation.partitionSchema.nonEmpty) {
val (partitionKeyFilters, _) =
getPartitionKeyFiltersAndDataFilters(
fsRelation.sparkSession,
logicalRelation,
partitionSchema,
filters,
logicalRelation.output)
val fileIndex = catalogFileIndex.filterPartitions(
partitionKeyFilters.toSeq)
lazy val prunedPartitionSize = fileIndex.partitionSpec().partitions.size
if (maxScanPartitionsOpt.exists(_ < prunedPartitionSize)) {
throw maxPartitionExceedError(
prunedPartitionSize,
maxScanPartitionsOpt.get,
logicalRelation.catalogTable,
catalogFileIndex.rootPaths,
fsRelation.partitionSchema)
}
lazy val scanFileSize = fileIndex
.listFiles(Nil, Nil).flatMap(_.files).map(_.getLen).sum
if (maxFileSizeOpt.exists(_ < scanFileSize)) {
throw partTableMaxFileExceedError(
scanFileSize,
maxFileSizeOpt.get,
logicalRelation.catalogTable,
catalogFileIndex.rootPaths.map(_.toString),
fsRelation.partitionSchema.map(_.name))
}
} else {
lazy val scanFileSize = catalogFileIndex.sizeInBytes
if (maxFileSizeOpt.exists(_ < scanFileSize)) {
throw nonPartTableMaxFileExceedError(
scanFileSize,
maxFileSizeOpt.get,
logicalRelation.catalogTable)
}
}
case ScanOperation(
_,
_,
relation @ DataSourceV2ScanRelation(_, _, _, _)) =>
val table = relation.relation.table
if (table.partitioning().nonEmpty) {
val partitionColumnNames = table.partitioning().map(_.describe())
val stats = relation.computeStats()
lazy val scanFileSize = stats.sizeInBytes
if (maxFileSizeOpt.exists(_ < scanFileSize)) {
throw new MaxFileSizeExceedException(
s"""
|SQL job scan file size in bytes: $scanFileSize
|exceed restrict of table scan maxFileSize ${maxFileSizeOpt.get}
|You should optimize your SQL logical according partition structure
|or shorten query scope such as p_date, detail as below:
|Table: ${table.name()}
|Partition Structure: ${partitionColumnNames.mkString(",")}
|""".stripMargin)
}
} else {
val stats = relation.computeStats()
lazy val scanFileSize = stats.sizeInBytes
if (maxFileSizeOpt.exists(_ < scanFileSize)) {
throw new MaxFileSizeExceedException(
s"""
|SQL job scan file size in bytes: $scanFileSize
|exceed restrict of table scan maxFileSize ${maxFileSizeOpt.get}
|detail as below:
|Table: ${table.name()}
|""".stripMargin)
}
}
case _ =>
}
}