def newDataFrame()

in dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/benchmark/Query.scala [61:155]


  def newDataFrame(): DataFrame = buildDataFrame

  override protected def doBenchmark(
      includeBreakdown: Boolean,
      description: String = "",
      messages: ArrayBuffer[String]): BenchmarkResult = {
    try {
      val dataFrame = buildDataFrame
      val queryExecution = dataFrame.queryExecution
      queryExecution.executedPlan
      val phases = queryExecution.tracker.phases
      val parsingTime = phases(QueryPlanningTracker.PARSING).durationMs.toDouble
      val analysisTime = phases(QueryPlanningTracker.ANALYSIS).durationMs.toDouble
      val optimizationTime = phases(QueryPlanningTracker.OPTIMIZATION).durationMs.toDouble
      val planningTime = phases(QueryPlanningTracker.PLANNING).durationMs.toDouble

      val breakdownResults =
        if (includeBreakdown) {
          val depth = queryExecution.executedPlan.collect { case p: SparkPlan => p }.size
          val physicalOperators = (0 until depth).map(i => (i, queryExecution.executedPlan.p(i)))
          val indexMap = physicalOperators.map { case (index, op) => (op, index) }.toMap
          val timeMap = new mutable.HashMap[Int, Double]
          val maxFields = 999 // Maximum number of fields that will be converted to strings

          physicalOperators.reverse.map {
            case (index, node) =>
              messages += s"Breakdown: ${node.simpleString(maxFields)}"
              val newNode = buildDataFrame.queryExecution.executedPlan.p(index)
              val executionTime = measureTimeMs {
                newNode.execute().foreach((row: Any) => Unit)
              }
              timeMap += ((index, executionTime))

              val childIndexes = node.children.map(indexMap)
              val childTime = childIndexes.map(timeMap).sum
              messages += s"Breakdown time: $executionTime (+${executionTime - childTime})"

              BreakdownResult(
                node.nodeName,
                node.simpleString(1000).replaceAll("#\\d+", ""),
                index,
                childIndexes,
                executionTime,
                executionTime - childTime)
          }
        } else {
          Seq.empty[BreakdownResult]
        }

      // The executionTime for the entire query includes the time of type conversion from catalyst
      // to scala.
      // Note: queryExecution.{logical, analyzed, optimizedPlan, executedPlan} has been already
      // lazily evaluated above, so below we will count only execution time.
      var result: Option[Long] = None
      val executionTime = measureTimeMs {
        executionMode match {
          case ExecutionMode.CollectResults => dataFrame.collect()
          case ExecutionMode.ForeachResults => dataFrame.foreach { _ => (): Unit }
          case ExecutionMode.WriteParquet(location) =>
            dataFrame.write.parquet(s"$location/$name.parquet")
          case ExecutionMode.HashResults =>
            // SELECT SUM(CRC32(CONCAT_WS(", ", *))) FROM (benchmark query)
            val row =
              dataFrame
                .selectExpr(s"sum(crc32(concat_ws(',', *)))")
                .head()
            result = if (row.isNullAt(0)) None else Some(row.getLong(0))
        }
      }

      val joinTypes = dataFrame.queryExecution.executedPlan.collect {
        case k if k.nodeName contains "Join" => k.nodeName
      }

      BenchmarkResult(
        name = name,
        mode = executionMode.toString,
        joinTypes = joinTypes,
        tables = tablesInvolved,
        parsingTime = parsingTime,
        analysisTime = analysisTime,
        optimizationTime = optimizationTime,
        planningTime = planningTime,
        executionTime = executionTime,
        result = result,
        queryExecution = dataFrame.queryExecution.toString,
        breakDown = breakdownResults)
    } catch {
      case e: Exception =>
        BenchmarkResult(
          name = name,
          mode = executionMode.toString,
          failure = Failure(e.getClass.getName, e.getMessage))
    }
  }