in dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/benchmark/Query.scala [61:155]
def newDataFrame(): DataFrame = buildDataFrame
override protected def doBenchmark(
includeBreakdown: Boolean,
description: String = "",
messages: ArrayBuffer[String]): BenchmarkResult = {
try {
val dataFrame = buildDataFrame
val queryExecution = dataFrame.queryExecution
queryExecution.executedPlan
val phases = queryExecution.tracker.phases
val parsingTime = phases(QueryPlanningTracker.PARSING).durationMs.toDouble
val analysisTime = phases(QueryPlanningTracker.ANALYSIS).durationMs.toDouble
val optimizationTime = phases(QueryPlanningTracker.OPTIMIZATION).durationMs.toDouble
val planningTime = phases(QueryPlanningTracker.PLANNING).durationMs.toDouble
val breakdownResults =
if (includeBreakdown) {
val depth = queryExecution.executedPlan.collect { case p: SparkPlan => p }.size
val physicalOperators = (0 until depth).map(i => (i, queryExecution.executedPlan.p(i)))
val indexMap = physicalOperators.map { case (index, op) => (op, index) }.toMap
val timeMap = new mutable.HashMap[Int, Double]
val maxFields = 999 // Maximum number of fields that will be converted to strings
physicalOperators.reverse.map {
case (index, node) =>
messages += s"Breakdown: ${node.simpleString(maxFields)}"
val newNode = buildDataFrame.queryExecution.executedPlan.p(index)
val executionTime = measureTimeMs {
newNode.execute().foreach((row: Any) => Unit)
}
timeMap += ((index, executionTime))
val childIndexes = node.children.map(indexMap)
val childTime = childIndexes.map(timeMap).sum
messages += s"Breakdown time: $executionTime (+${executionTime - childTime})"
BreakdownResult(
node.nodeName,
node.simpleString(1000).replaceAll("#\\d+", ""),
index,
childIndexes,
executionTime,
executionTime - childTime)
}
} else {
Seq.empty[BreakdownResult]
}
// The executionTime for the entire query includes the time of type conversion from catalyst
// to scala.
// Note: queryExecution.{logical, analyzed, optimizedPlan, executedPlan} has been already
// lazily evaluated above, so below we will count only execution time.
var result: Option[Long] = None
val executionTime = measureTimeMs {
executionMode match {
case ExecutionMode.CollectResults => dataFrame.collect()
case ExecutionMode.ForeachResults => dataFrame.foreach { _ => (): Unit }
case ExecutionMode.WriteParquet(location) =>
dataFrame.write.parquet(s"$location/$name.parquet")
case ExecutionMode.HashResults =>
// SELECT SUM(CRC32(CONCAT_WS(", ", *))) FROM (benchmark query)
val row =
dataFrame
.selectExpr(s"sum(crc32(concat_ws(',', *)))")
.head()
result = if (row.isNullAt(0)) None else Some(row.getLong(0))
}
}
val joinTypes = dataFrame.queryExecution.executedPlan.collect {
case k if k.nodeName contains "Join" => k.nodeName
}
BenchmarkResult(
name = name,
mode = executionMode.toString,
joinTypes = joinTypes,
tables = tablesInvolved,
parsingTime = parsingTime,
analysisTime = analysisTime,
optimizationTime = optimizationTime,
planningTime = planningTime,
executionTime = executionTime,
result = result,
queryExecution = dataFrame.queryExecution.toString,
breakDown = breakdownResults)
} catch {
case e: Exception =>
BenchmarkResult(
name = name,
mode = executionMode.toString,
failure = Failure(e.getClass.getName, e.getMessage))
}
}