private def collectInternal()

in src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala [295:388]


  private def collectInternal(df: DataFrame, rowType: RelDataType): (java.lang.Iterable[java.util.List[String]], Int) = logTime("collectInternal", debug = true) {
    val jobGroup = Thread.currentThread().getName
    val sparkContext = SparderEnv.getSparkSession.sparkContext
    val kapConfig = KapConfig.getInstanceFromEnv
    val partitionsNum =
      if (kapConfig.getSparkSqlShufflePartitions != -1) {
        kapConfig.getSparkSqlShufflePartitions
      } else {
        Math.min(QueryContext.current().getMetrics.getSourceScanBytes / PARTITION_SPLIT_BYTES + 1,
          SparderEnv.getTotalCore).toInt
      }
    QueryContext.current().setShufflePartitions(partitionsNum)
    logInfo(s"partitions num are: $partitionsNum," +
      s" total scan bytes are: ${QueryContext.current().getMetrics.getSourceScanBytes}," +
      s" total cores are: ${SparderEnv.getTotalCore}")

    val queryId = QueryContext.current().getQueryId
    sparkContext.setLocalProperty(QueryToExecutionIDCache.KYLIN_QUERY_ID_KEY, queryId)

    sparkContext.setJobGroup(jobGroup,
      QueryContext.current().getMetrics.getCorrectedSql,
      interruptOnCancel = true)
    try {
      val autoBroadcastJoinThreshold = SparderEnv.getSparkSession.sessionState.conf.autoBroadcastJoinThreshold
      val sparkPlan = df.queryExecution.executedPlan
      var sumOfSourceScanRows = QueryContext.current.getMetrics.getAccumSourceScanRows
      if (KapConfig.getInstanceFromEnv.isQueryLimitEnabled && KapConfig.getInstanceFromEnv.isApplyLimitInfoToSourceScanRowsEnabled) {
        val accumRowsCounter = new AtomicLong(0)
        extractEachStageLimitRows(sparkPlan, -1, accumRowsCounter)
        sumOfSourceScanRows = accumRowsCounter.get()
        logDebug(s"Spark executed plan is \n $sparkPlan; \n accumRowsCounter: $accumRowsCounter")
      }
      logInfo(s"autoBroadcastJoinThreshold: [before:$autoBroadcastJoinThreshold, " +
        s"after: ${SparderEnv.getSparkSession.sessionState.conf.autoBroadcastJoinThreshold}]")
      sparkContext.setLocalProperty("source_scan_rows", QueryContext.current().getMetrics.getSourceScanRows.toString)
      logDebug(s"source_scan_rows is ${QueryContext.current().getMetrics.getSourceScanRows.toString}")

      val bigQueryThreshold = BigQueryThresholdUpdater.getBigQueryThreshold
      val pool = getQueryFairSchedulerPool(sparkContext.getConf, QueryContext.current(), bigQueryThreshold,
        sumOfSourceScanRows, partitionsNum)
      sparkContext.setLocalProperty(SPARK_SCHEDULER_POOL, pool)

      // judge whether to refuse the new big query
      logDebug(s"Total source scan rows: $sumOfSourceScanRows")
      val sourceScanRows = Array(new lang.Long(sumOfSourceScanRows)).toList.asJava
      val ifBigQuery: Boolean = QueryContext.current().isIfBigQuery
      ifRefuseQuery(sumOfSourceScanRows, bigQueryThreshold, sourceScanRows, ifBigQuery)

      QueryContext.current.record("executed_plan")
      QueryContext.currentTrace().endLastSpan()
      val jobTrace = new SparkJobTrace(jobGroup, QueryContext.currentTrace(), QueryContext.current().getQueryId, sparkContext)
      val results = if (NProjectManager.getProjectConfig(QueryContext.current().getProject)
        .isQueryUseIterableCollectApi) {
        df.collectToIterator()
      } else {
        df.toIterator()
      }
      val resultRows = results._1
      val resultSize = results._2
      if (kapConfig.isQuerySparkJobTraceEnabled) jobTrace.jobFinished()
      QueryContext.current.record("collect_result")

      val (scanRows, scanBytes) = QueryMetricUtils.collectScanMetrics(df.queryExecution.executedPlan)
      val (jobCount, stageCount, taskCount) = QueryMetricUtils.collectTaskRelatedMetrics(jobGroup, sparkContext)
      val cpuTime = QueryCostCollector.getAndCleanStatus(QueryContext.current().getQueryId)
      QueryContext.current().getMetrics.setScanRows(scanRows)

      QueryContext.current().getMetrics.setScanBytes(scanBytes)
      QueryContext.current().getMetrics.setQueryJobCount(jobCount)
      QueryContext.current().getMetrics.setQueryStageCount(stageCount)
      QueryContext.current().getMetrics.setQueryTaskCount(taskCount)
      QueryContext.current().getMetrics.setCpuTime(cpuTime)

      logInfo(s"Actual total scan count: $scanRows, " +
        s"file scan row count: ${QueryContext.current.getMetrics.getAccumSourceScanRows}, " +
        s"may apply limit row count: $sumOfSourceScanRows, Big query threshold: $bigQueryThreshold, Allocate pool: $pool, " +
        s"Is Vip: ${QueryContext.current().getQueryTagInfo.isHighPriorityQuery}, " +
        s"Is TableIndex: ${QueryContext.current().getQueryTagInfo.isTableIndex}")

      val resultTypes = rowType.getFieldList.asScala
      (readResultRow(resultRows, resultTypes), resultSize)
    } catch {
      case e: Throwable =>
        if (e.isInstanceOf[InterruptedException]) {
          Thread.currentThread.interrupt()
          sparkContext.cancelJobGroup(jobGroup)
          QueryInterruptChecker.checkThreadInterrupted("Interrupted at the stage of collecting result in ResultPlan.",
            "Current step: Collecting dataset for sparder.")
        }
        throw e
    } finally {
      QueryContext.current().setExecutionID(QueryToExecutionIDCache.getQueryExecutionID(queryId))
    }
  }