in src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala [295:388]
private def collectInternal(df: DataFrame, rowType: RelDataType): (java.lang.Iterable[java.util.List[String]], Int) = logTime("collectInternal", debug = true) {
val jobGroup = Thread.currentThread().getName
val sparkContext = SparderEnv.getSparkSession.sparkContext
val kapConfig = KapConfig.getInstanceFromEnv
val partitionsNum =
if (kapConfig.getSparkSqlShufflePartitions != -1) {
kapConfig.getSparkSqlShufflePartitions
} else {
Math.min(QueryContext.current().getMetrics.getSourceScanBytes / PARTITION_SPLIT_BYTES + 1,
SparderEnv.getTotalCore).toInt
}
QueryContext.current().setShufflePartitions(partitionsNum)
logInfo(s"partitions num are: $partitionsNum," +
s" total scan bytes are: ${QueryContext.current().getMetrics.getSourceScanBytes}," +
s" total cores are: ${SparderEnv.getTotalCore}")
val queryId = QueryContext.current().getQueryId
sparkContext.setLocalProperty(QueryToExecutionIDCache.KYLIN_QUERY_ID_KEY, queryId)
sparkContext.setJobGroup(jobGroup,
QueryContext.current().getMetrics.getCorrectedSql,
interruptOnCancel = true)
try {
val autoBroadcastJoinThreshold = SparderEnv.getSparkSession.sessionState.conf.autoBroadcastJoinThreshold
val sparkPlan = df.queryExecution.executedPlan
var sumOfSourceScanRows = QueryContext.current.getMetrics.getAccumSourceScanRows
if (KapConfig.getInstanceFromEnv.isQueryLimitEnabled && KapConfig.getInstanceFromEnv.isApplyLimitInfoToSourceScanRowsEnabled) {
val accumRowsCounter = new AtomicLong(0)
extractEachStageLimitRows(sparkPlan, -1, accumRowsCounter)
sumOfSourceScanRows = accumRowsCounter.get()
logDebug(s"Spark executed plan is \n $sparkPlan; \n accumRowsCounter: $accumRowsCounter")
}
logInfo(s"autoBroadcastJoinThreshold: [before:$autoBroadcastJoinThreshold, " +
s"after: ${SparderEnv.getSparkSession.sessionState.conf.autoBroadcastJoinThreshold}]")
sparkContext.setLocalProperty("source_scan_rows", QueryContext.current().getMetrics.getSourceScanRows.toString)
logDebug(s"source_scan_rows is ${QueryContext.current().getMetrics.getSourceScanRows.toString}")
val bigQueryThreshold = BigQueryThresholdUpdater.getBigQueryThreshold
val pool = getQueryFairSchedulerPool(sparkContext.getConf, QueryContext.current(), bigQueryThreshold,
sumOfSourceScanRows, partitionsNum)
sparkContext.setLocalProperty(SPARK_SCHEDULER_POOL, pool)
// judge whether to refuse the new big query
logDebug(s"Total source scan rows: $sumOfSourceScanRows")
val sourceScanRows = Array(new lang.Long(sumOfSourceScanRows)).toList.asJava
val ifBigQuery: Boolean = QueryContext.current().isIfBigQuery
ifRefuseQuery(sumOfSourceScanRows, bigQueryThreshold, sourceScanRows, ifBigQuery)
QueryContext.current.record("executed_plan")
QueryContext.currentTrace().endLastSpan()
val jobTrace = new SparkJobTrace(jobGroup, QueryContext.currentTrace(), QueryContext.current().getQueryId, sparkContext)
val results = if (NProjectManager.getProjectConfig(QueryContext.current().getProject)
.isQueryUseIterableCollectApi) {
df.collectToIterator()
} else {
df.toIterator()
}
val resultRows = results._1
val resultSize = results._2
if (kapConfig.isQuerySparkJobTraceEnabled) jobTrace.jobFinished()
QueryContext.current.record("collect_result")
val (scanRows, scanBytes) = QueryMetricUtils.collectScanMetrics(df.queryExecution.executedPlan)
val (jobCount, stageCount, taskCount) = QueryMetricUtils.collectTaskRelatedMetrics(jobGroup, sparkContext)
val cpuTime = QueryCostCollector.getAndCleanStatus(QueryContext.current().getQueryId)
QueryContext.current().getMetrics.setScanRows(scanRows)
QueryContext.current().getMetrics.setScanBytes(scanBytes)
QueryContext.current().getMetrics.setQueryJobCount(jobCount)
QueryContext.current().getMetrics.setQueryStageCount(stageCount)
QueryContext.current().getMetrics.setQueryTaskCount(taskCount)
QueryContext.current().getMetrics.setCpuTime(cpuTime)
logInfo(s"Actual total scan count: $scanRows, " +
s"file scan row count: ${QueryContext.current.getMetrics.getAccumSourceScanRows}, " +
s"may apply limit row count: $sumOfSourceScanRows, Big query threshold: $bigQueryThreshold, Allocate pool: $pool, " +
s"Is Vip: ${QueryContext.current().getQueryTagInfo.isHighPriorityQuery}, " +
s"Is TableIndex: ${QueryContext.current().getQueryTagInfo.isTableIndex}")
val resultTypes = rowType.getFieldList.asScala
(readResultRow(resultRows, resultTypes), resultSize)
} catch {
case e: Throwable =>
if (e.isInstanceOf[InterruptedException]) {
Thread.currentThread.interrupt()
sparkContext.cancelJobGroup(jobGroup)
QueryInterruptChecker.checkThreadInterrupted("Interrupted at the stage of collecting result in ResultPlan.",
"Current step: Collecting dataset for sparder.")
}
throw e
} finally {
QueryContext.current().setExecutionID(QueryToExecutionIDCache.getQueryExecutionID(queryId))
}
}